diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 06b0f61b81d..09b11a4933e 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -17,6 +17,7 @@ limitations under the License. package record import ( + "encoding/json" "fmt" "math/rand" "time" @@ -27,6 +28,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -45,6 +47,7 @@ const maxQueuedEvents = 1000 type EventSink interface { Create(event *api.Event) (*api.Event, error) Update(event *api.Event) (*api.Event, error) + Patch(oldEvent *api.Event, data []byte) (*api.Event, error) } // EventRecorder knows how to record events on behalf of an EventSource. @@ -111,19 +114,26 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin // Events are safe to copy like this. eventCopy := *event event = &eventCopy - + var patch []byte previousEvent := eventCache.getEvent(event) updateExistingEvent := previousEvent.Count > 0 if updateExistingEvent { - event.Count = previousEvent.Count + 1 - event.FirstTimestamp = previousEvent.FirstTimestamp + // we still need to copy Name because the Patch relies on the Name to find the target event event.Name = previousEvent.Name - event.ResourceVersion = previousEvent.ResourceVersion + event.Count = previousEvent.Count + 1 + + // we need to make sure the Count and LastTimestamp are the only differences between event and the eventCopy2 + eventCopy2 := *event + eventCopy2.Count = 0 + eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0)) + newData, _ := json.Marshal(event) + oldData, _ := json.Marshal(eventCopy2) + patch, _ = strategicpatch.CreateStrategicMergePatch(oldData, newData, event) } tries := 0 for { - if recordEvent(sink, event, updateExistingEvent, eventCache) { + if recordEvent(sink, event, patch, updateExistingEvent, eventCache) { break } tries++ @@ -157,11 +167,11 @@ func isKeyNotFoundError(err error) bool { // was successfully recorded or discarded, false if it should be retried. // If updateExistingEvent is false, it creates a new event, otherwise it updates // existing event. -func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool { +func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCache *historyCache) bool { var newEvent *api.Event var err error if updateExistingEvent { - newEvent, err = sink.Update(event) + newEvent, err = sink.Patch(event, patch) } // Update can fail because the event may have been removed and it no longer exists. if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) { @@ -232,12 +242,13 @@ func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler fun // NewRecorder returns an EventRecorder that records events with the given event source. func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder { - return &recorderImpl{source, eventBroadcaster.Broadcaster} + return &recorderImpl{source, eventBroadcaster.Broadcaster, util.RealClock{}} } type recorderImpl struct { source api.EventSource *watch.Broadcaster + clock util.Clock } func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, reason, message string) { @@ -247,7 +258,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unv return } - event := makeEvent(ref, reason, message) + event := recorder.makeEvent(ref, reason, message) event.Source = recorder.source recorder.Action(watch.Added, event) @@ -265,8 +276,8 @@ func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp unvers recorder.generateEvent(object, timestamp, reason, fmt.Sprintf(messageFmt, args...)) } -func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event { - t := unversioned.Now() +func (recorder *recorderImpl) makeEvent(ref *api.ObjectReference, reason, message string) *api.Event { + t := unversioned.Time{recorder.clock.Now()} namespace := ref.Namespace if namespace == "" { namespace = api.NamespaceDefault diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 33fc1f3a010..4a551dc23d4 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -17,17 +17,20 @@ limitations under the License. package record import ( + "encoding/json" "fmt" "reflect" "strconv" "strings" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/strategicpatch" ) func init() { @@ -38,6 +41,7 @@ func init() { type testEventSink struct { OnCreate func(e *api.Event) (*api.Event, error) OnUpdate func(e *api.Event) (*api.Event, error) + OnPatch func(e *api.Event, p []byte) (*api.Event, error) } // CreateEvent records the event for testing. @@ -56,6 +60,50 @@ func (t *testEventSink) Update(e *api.Event) (*api.Event, error) { return e, nil } +// PatchEvent records the event for testing. +func (t *testEventSink) Patch(e *api.Event, p []byte) (*api.Event, error) { + if t.OnPatch != nil { + return t.OnPatch(e, p) + } + return e, nil +} + +type OnCreateFunc func(*api.Event) (*api.Event, error) + +func OnCreateFactory(testCache map[string]*api.Event, createEvent chan<- *api.Event) OnCreateFunc { + return func(event *api.Event) (*api.Event, error) { + testCache[getEventKey(event)] = event + createEvent <- event + return event, nil + } +} + +type OnPatchFunc func(*api.Event, []byte) (*api.Event, error) + +func OnPatchFactory(testCache map[string]*api.Event, patchEvent chan<- *api.Event) OnPatchFunc { + return func(event *api.Event, patch []byte) (*api.Event, error) { + cachedEvent, found := testCache[getEventKey(event)] + if !found { + return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache. Try to find Event: %v", event) + } + originalData, err := json.Marshal(cachedEvent) + if err != nil { + return nil, fmt.Errorf("unexpected error: %v", err) + } + patched, err := strategicpatch.StrategicMergePatch(originalData, patch, event) + if err != nil { + return nil, fmt.Errorf("unexpected error: %v", err) + } + patchedObj := &api.Event{} + err = json.Unmarshal(patched, patchedObj) + if err != nil { + return nil, fmt.Errorf("unexpected error: %v", err) + } + patchEvent <- patchedObj + return patchedObj, nil + } +} + func TestEventf(t *testing.T) { testPod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -270,24 +318,26 @@ func TestEventf(t *testing.T) { }, } + testCache := map[string]*api.Event{} logCalled := make(chan struct{}) createEvent := make(chan *api.Event) updateEvent := make(chan *api.Event) + patchEvent := make(chan *api.Event) testEvents := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - createEvent <- event - return event, nil - }, + OnCreate: OnCreateFactory(testCache, createEvent), OnUpdate: func(event *api.Event) (*api.Event, error) { updateEvent <- event return event, nil }, + OnPatch: OnPatchFactory(testCache, patchEvent), } eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + clock := &util.FakeClock{time.Now()} + recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for _, item := range table { + clock.Step(1 * time.Second) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { @@ -301,7 +351,7 @@ func TestEventf(t *testing.T) { // validate event if item.expectUpdate { - actualEvent := <-updateEvent + actualEvent := <-patchEvent validateEvent(actualEvent, item.expect, t) } else { actualEvent := <-createEvent @@ -348,6 +398,10 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing. return actualEvent, nil } +func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBroadcaster, clock util.Clock) EventRecorder { + return &recorderImpl{eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock} +} + func TestWriteEventError(t *testing.T) { ref := &api.ObjectReference{ Kind: "Pod", @@ -412,8 +466,10 @@ func TestWriteEventError(t *testing.T) { }, }, ).Stop() - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + clock := &util.FakeClock{time.Now()} + recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for caseName := range table { + clock.Step(1 * time.Second) recorder.Event(ref, "Reason", caseName) } recorder.Event(ref, "Reason", "finished") @@ -528,25 +584,27 @@ func TestEventfNoNamespace(t *testing.T) { }, } + testCache := map[string]*api.Event{} logCalled := make(chan struct{}) createEvent := make(chan *api.Event) updateEvent := make(chan *api.Event) + patchEvent := make(chan *api.Event) testEvents := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - createEvent <- event - return event, nil - }, + OnCreate: OnCreateFactory(testCache, createEvent), OnUpdate: func(event *api.Event) (*api.Event, error) { updateEvent <- event return event, nil }, + OnPatch: OnPatchFactory(testCache, patchEvent), } eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + clock := &util.FakeClock{time.Now()} + recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) for _, item := range table { + clock.Step(1 * time.Second) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { @@ -560,7 +618,7 @@ func TestEventfNoNamespace(t *testing.T) { // validate event if item.expectUpdate { - actualEvent := <-updateEvent + actualEvent := <-patchEvent validateEvent(actualEvent, item.expect, t) } else { actualEvent := <-createEvent @@ -787,42 +845,44 @@ func TestMultiSinkCache(t *testing.T) { }, } + testCache := map[string]*api.Event{} createEvent := make(chan *api.Event) updateEvent := make(chan *api.Event) + patchEvent := make(chan *api.Event) testEvents := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - createEvent <- event - return event, nil - }, + OnCreate: OnCreateFactory(testCache, createEvent), OnUpdate: func(event *api.Event) (*api.Event, error) { updateEvent <- event return event, nil }, + OnPatch: OnPatchFactory(testCache, patchEvent), } + testCache2 := map[string]*api.Event{} createEvent2 := make(chan *api.Event) updateEvent2 := make(chan *api.Event) + patchEvent2 := make(chan *api.Event) testEvents2 := testEventSink{ - OnCreate: func(event *api.Event) (*api.Event, error) { - createEvent2 <- event - return event, nil - }, + OnCreate: OnCreateFactory(testCache2, createEvent2), OnUpdate: func(event *api.Event) (*api.Event, error) { updateEvent2 <- event return event, nil }, + OnPatch: OnPatchFactory(testCache2, patchEvent2), } eventBroadcaster := NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + clock := &util.FakeClock{time.Now()} + recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) for _, item := range table { + clock.Step(1 * time.Second) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) // validate event if item.expectUpdate { - actualEvent := <-updateEvent + actualEvent := <-patchEvent validateEvent(actualEvent, item.expect, t) } else { actualEvent := <-createEvent @@ -833,11 +893,12 @@ func TestMultiSinkCache(t *testing.T) { // Another StartRecordingToSink call should start to record events with new clean cache. sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2) for _, item := range table { + clock.Step(1 * time.Second) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) // validate event if item.expectUpdate { - actualEvent := <-updateEvent2 + actualEvent := <-patchEvent2 validateEvent(actualEvent, item.expect, t) } else { actualEvent := <-createEvent2 diff --git a/pkg/client/unversioned/events.go b/pkg/client/unversioned/events.go index c37bcf17800..6094f9773b1 100644 --- a/pkg/client/unversioned/events.go +++ b/pkg/client/unversioned/events.go @@ -35,6 +35,7 @@ type EventNamespacer interface { type EventInterface interface { Create(event *api.Event) (*api.Event, error) Update(event *api.Event) (*api.Event, error) + Patch(event *api.Event, data []byte) (*api.Event, error) List(label labels.Selector, field fields.Selector) (*api.EventList, error) Get(name string) (*api.Event, error) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) @@ -98,6 +99,22 @@ func (e *events) Update(event *api.Event) (*api.Event, error) { return result, err } +// Patch modifies an existing event. It returns the copy of the event that the server returns, or an +// error. The namespace and name of the target event is deduced from the incompleteEvent. The +// namespace must either match this event client's namespace, or this event client must have been +// created with the "" namespace. +func (e *events) Patch(incompleteEvent *api.Event, data []byte) (*api.Event, error) { + result := &api.Event{} + err := e.client.Patch(api.StrategicMergePatchType). + NamespaceIfScoped(incompleteEvent.Namespace, len(incompleteEvent.Namespace) > 0). + Resource("events"). + Name(incompleteEvent.Name). + Body(data). + Do(). + Into(result) + return result, err +} + // List returns a list of events matching the selectors. func (e *events) List(label labels.Selector, field fields.Selector) (*api.EventList, error) { result := &api.EventList{} diff --git a/pkg/client/unversioned/testclient/actions.go b/pkg/client/unversioned/testclient/actions.go index ae0bde6d96d..04132dc25ee 100644 --- a/pkg/client/unversioned/testclient/actions.go +++ b/pkg/client/unversioned/testclient/actions.go @@ -100,6 +100,25 @@ func NewUpdateAction(resource, namespace string, object runtime.Object) UpdateAc return action } +func NewRootPatchAction(resource string, object runtime.Object) PatchActionImpl { + action := PatchActionImpl{} + action.Verb = "patch" + action.Resource = resource + action.Object = object + + return action +} + +func NewPatchAction(resource, namespace string, object runtime.Object) PatchActionImpl { + action := PatchActionImpl{} + action.Verb = "patch" + action.Resource = resource + action.Namespace = namespace + action.Object = object + + return action +} + func NewUpdateSubresourceAction(resource, subresource, namespace string, object runtime.Object) UpdateActionImpl { action := UpdateActionImpl{} action.Verb = "update" @@ -289,6 +308,15 @@ func (a UpdateActionImpl) GetObject() runtime.Object { return a.Object } +type PatchActionImpl struct { + ActionImpl + Object runtime.Object +} + +func (a PatchActionImpl) GetObject() runtime.Object { + return a.Object +} + type DeleteActionImpl struct { ActionImpl Name string diff --git a/pkg/client/unversioned/testclient/fake_events.go b/pkg/client/unversioned/testclient/fake_events.go index 23cdc62eb60..f8186e9a8e3 100644 --- a/pkg/client/unversioned/testclient/fake_events.go +++ b/pkg/client/unversioned/testclient/fake_events.go @@ -87,6 +87,20 @@ func (c *FakeEvents) Update(event *api.Event) (*api.Event, error) { return obj.(*api.Event), err } +// Patch patches an existing event. Returns the copy of the event the server returns, or an error. +func (c *FakeEvents) Patch(event *api.Event, data []byte) (*api.Event, error) { + action := NewRootPatchAction("events", event) + if c.Namespace != "" { + action = NewPatchAction("events", c.Namespace, event) + } + obj, err := c.Fake.Invokes(action, event) + if obj == nil { + return nil, err + } + + return obj.(*api.Event), err +} + func (c *FakeEvents) Delete(name string) error { action := NewRootDeleteAction("events", name) if c.Namespace != "" {