Merge pull request #15268 from caesarxuchao/fix-14126

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-13 19:30:17 -07:00
commit dfb400e2e9
5 changed files with 166 additions and 35 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package record
import (
"encoding/json"
"fmt"
"math/rand"
"time"
@ -27,6 +28,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@ -45,6 +47,7 @@ const maxQueuedEvents = 1000
type EventSink interface {
Create(event *api.Event) (*api.Event, error)
Update(event *api.Event) (*api.Event, error)
Patch(oldEvent *api.Event, data []byte) (*api.Event, error)
}
// EventRecorder knows how to record events on behalf of an EventSource.
@ -111,19 +114,26 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
var patch []byte
previousEvent := eventCache.getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
// we still need to copy Name because the Patch relies on the Name to find the target event
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
event.Count = previousEvent.Count + 1
// we need to make sure the Count and LastTimestamp are the only differences between event and the eventCopy2
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, _ = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
}
tries := 0
for {
if recordEvent(sink, event, updateExistingEvent, eventCache) {
if recordEvent(sink, event, patch, updateExistingEvent, eventCache) {
break
}
tries++
@ -157,11 +167,11 @@ func isKeyNotFoundError(err error) bool {
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool {
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCache *historyCache) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
newEvent, err = sink.Update(event)
newEvent, err = sink.Patch(event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
@ -232,12 +242,13 @@ func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler fun
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder {
return &recorderImpl{source, eventBroadcaster.Broadcaster}
return &recorderImpl{source, eventBroadcaster.Broadcaster, util.RealClock{}}
}
type recorderImpl struct {
source api.EventSource
*watch.Broadcaster
clock util.Clock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, reason, message string) {
@ -247,7 +258,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unv
return
}
event := makeEvent(ref, reason, message)
event := recorder.makeEvent(ref, reason, message)
event.Source = recorder.source
recorder.Action(watch.Added, event)
@ -265,8 +276,8 @@ func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp unvers
recorder.generateEvent(object, timestamp, reason, fmt.Sprintf(messageFmt, args...))
}
func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
t := unversioned.Now()
func (recorder *recorderImpl) makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
t := unversioned.Time{recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = api.NamespaceDefault

View File

@ -17,17 +17,20 @@ limitations under the License.
package record
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/strategicpatch"
)
func init() {
@ -38,6 +41,7 @@ func init() {
type testEventSink struct {
OnCreate func(e *api.Event) (*api.Event, error)
OnUpdate func(e *api.Event) (*api.Event, error)
OnPatch func(e *api.Event, p []byte) (*api.Event, error)
}
// CreateEvent records the event for testing.
@ -56,6 +60,50 @@ func (t *testEventSink) Update(e *api.Event) (*api.Event, error) {
return e, nil
}
// PatchEvent records the event for testing.
func (t *testEventSink) Patch(e *api.Event, p []byte) (*api.Event, error) {
if t.OnPatch != nil {
return t.OnPatch(e, p)
}
return e, nil
}
type OnCreateFunc func(*api.Event) (*api.Event, error)
func OnCreateFactory(testCache map[string]*api.Event, createEvent chan<- *api.Event) OnCreateFunc {
return func(event *api.Event) (*api.Event, error) {
testCache[getEventKey(event)] = event
createEvent <- event
return event, nil
}
}
type OnPatchFunc func(*api.Event, []byte) (*api.Event, error)
func OnPatchFactory(testCache map[string]*api.Event, patchEvent chan<- *api.Event) OnPatchFunc {
return func(event *api.Event, patch []byte) (*api.Event, error) {
cachedEvent, found := testCache[getEventKey(event)]
if !found {
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache. Try to find Event: %v", event)
}
originalData, err := json.Marshal(cachedEvent)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
}
patched, err := strategicpatch.StrategicMergePatch(originalData, patch, event)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
}
patchedObj := &api.Event{}
err = json.Unmarshal(patched, patchedObj)
if err != nil {
return nil, fmt.Errorf("unexpected error: %v", err)
}
patchEvent <- patchedObj
return patchedObj, nil
}
}
func TestEventf(t *testing.T) {
testPod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@ -270,24 +318,26 @@ func TestEventf(t *testing.T) {
},
}
testCache := map[string]*api.Event{}
logCalled := make(chan struct{})
createEvent := make(chan *api.Event)
updateEvent := make(chan *api.Event)
patchEvent := make(chan *api.Event)
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
createEvent <- event
return event, nil
},
OnCreate: OnCreateFactory(testCache, createEvent),
OnUpdate: func(event *api.Event) (*api.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for _, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
@ -301,7 +351,7 @@ func TestEventf(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-updateEvent
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
@ -348,6 +398,10 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
return actualEvent, nil
}
func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBroadcaster, clock util.Clock) EventRecorder {
return &recorderImpl{eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock}
}
func TestWriteEventError(t *testing.T) {
ref := &api.ObjectReference{
Kind: "Pod",
@ -412,8 +466,10 @@ func TestWriteEventError(t *testing.T) {
},
},
).Stop()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for caseName := range table {
clock.Step(1 * time.Second)
recorder.Event(ref, "Reason", caseName)
}
recorder.Event(ref, "Reason", "finished")
@ -528,25 +584,27 @@ func TestEventfNoNamespace(t *testing.T) {
},
}
testCache := map[string]*api.Event{}
logCalled := make(chan struct{})
createEvent := make(chan *api.Event)
updateEvent := make(chan *api.Event)
patchEvent := make(chan *api.Event)
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
createEvent <- event
return event, nil
},
OnCreate: OnCreateFactory(testCache, createEvent),
OnUpdate: func(event *api.Event) (*api.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for _, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
@ -560,7 +618,7 @@ func TestEventfNoNamespace(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-updateEvent
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
@ -787,42 +845,44 @@ func TestMultiSinkCache(t *testing.T) {
},
}
testCache := map[string]*api.Event{}
createEvent := make(chan *api.Event)
updateEvent := make(chan *api.Event)
patchEvent := make(chan *api.Event)
testEvents := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
createEvent <- event
return event, nil
},
OnCreate: OnCreateFactory(testCache, createEvent),
OnUpdate: func(event *api.Event) (*api.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: OnPatchFactory(testCache, patchEvent),
}
testCache2 := map[string]*api.Event{}
createEvent2 := make(chan *api.Event)
updateEvent2 := make(chan *api.Event)
patchEvent2 := make(chan *api.Event)
testEvents2 := testEventSink{
OnCreate: func(event *api.Event) (*api.Event, error) {
createEvent2 <- event
return event, nil
},
OnCreate: OnCreateFactory(testCache2, createEvent2),
OnUpdate: func(event *api.Event) (*api.Event, error) {
updateEvent2 <- event
return event, nil
},
OnPatch: OnPatchFactory(testCache2, patchEvent2),
}
eventBroadcaster := NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
for _, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
// validate event
if item.expectUpdate {
actualEvent := <-updateEvent
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
@ -833,11 +893,12 @@ func TestMultiSinkCache(t *testing.T) {
// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
for _, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
// validate event
if item.expectUpdate {
actualEvent := <-updateEvent2
actualEvent := <-patchEvent2
validateEvent(actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent2

View File

@ -35,6 +35,7 @@ type EventNamespacer interface {
type EventInterface interface {
Create(event *api.Event) (*api.Event, error)
Update(event *api.Event) (*api.Event, error)
Patch(event *api.Event, data []byte) (*api.Event, error)
List(label labels.Selector, field fields.Selector) (*api.EventList, error)
Get(name string) (*api.Event, error)
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
@ -98,6 +99,22 @@ func (e *events) Update(event *api.Event) (*api.Event, error) {
return result, err
}
// Patch modifies an existing event. It returns the copy of the event that the server returns, or an
// error. The namespace and name of the target event is deduced from the incompleteEvent. The
// namespace must either match this event client's namespace, or this event client must have been
// created with the "" namespace.
func (e *events) Patch(incompleteEvent *api.Event, data []byte) (*api.Event, error) {
result := &api.Event{}
err := e.client.Patch(api.StrategicMergePatchType).
NamespaceIfScoped(incompleteEvent.Namespace, len(incompleteEvent.Namespace) > 0).
Resource("events").
Name(incompleteEvent.Name).
Body(data).
Do().
Into(result)
return result, err
}
// List returns a list of events matching the selectors.
func (e *events) List(label labels.Selector, field fields.Selector) (*api.EventList, error) {
result := &api.EventList{}

View File

@ -100,6 +100,25 @@ func NewUpdateAction(resource, namespace string, object runtime.Object) UpdateAc
return action
}
func NewRootPatchAction(resource string, object runtime.Object) PatchActionImpl {
action := PatchActionImpl{}
action.Verb = "patch"
action.Resource = resource
action.Object = object
return action
}
func NewPatchAction(resource, namespace string, object runtime.Object) PatchActionImpl {
action := PatchActionImpl{}
action.Verb = "patch"
action.Resource = resource
action.Namespace = namespace
action.Object = object
return action
}
func NewUpdateSubresourceAction(resource, subresource, namespace string, object runtime.Object) UpdateActionImpl {
action := UpdateActionImpl{}
action.Verb = "update"
@ -289,6 +308,15 @@ func (a UpdateActionImpl) GetObject() runtime.Object {
return a.Object
}
type PatchActionImpl struct {
ActionImpl
Object runtime.Object
}
func (a PatchActionImpl) GetObject() runtime.Object {
return a.Object
}
type DeleteActionImpl struct {
ActionImpl
Name string

View File

@ -87,6 +87,20 @@ func (c *FakeEvents) Update(event *api.Event) (*api.Event, error) {
return obj.(*api.Event), err
}
// Patch patches an existing event. Returns the copy of the event the server returns, or an error.
func (c *FakeEvents) Patch(event *api.Event, data []byte) (*api.Event, error) {
action := NewRootPatchAction("events", event)
if c.Namespace != "" {
action = NewPatchAction("events", c.Namespace, event)
}
obj, err := c.Fake.Invokes(action, event)
if obj == nil {
return nil, err
}
return obj.(*api.Event), err
}
func (c *FakeEvents) Delete(name string) error {
action := NewRootDeleteAction("events", name)
if c.Namespace != "" {