mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
Use PATCH instead of PUT when update events
This commit is contained in:
parent
123a5bd465
commit
b443f1264c
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package record
|
package record
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
@ -27,6 +28,7 @@ import (
|
|||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/strategicpatch"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -45,6 +47,7 @@ const maxQueuedEvents = 1000
|
|||||||
type EventSink interface {
|
type EventSink interface {
|
||||||
Create(event *api.Event) (*api.Event, error)
|
Create(event *api.Event) (*api.Event, error)
|
||||||
Update(event *api.Event) (*api.Event, error)
|
Update(event *api.Event) (*api.Event, error)
|
||||||
|
Patch(oldEvent *api.Event, data []byte) (*api.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||||
@ -111,19 +114,26 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
|||||||
// Events are safe to copy like this.
|
// Events are safe to copy like this.
|
||||||
eventCopy := *event
|
eventCopy := *event
|
||||||
event = &eventCopy
|
event = &eventCopy
|
||||||
|
var patch []byte
|
||||||
previousEvent := eventCache.getEvent(event)
|
previousEvent := eventCache.getEvent(event)
|
||||||
updateExistingEvent := previousEvent.Count > 0
|
updateExistingEvent := previousEvent.Count > 0
|
||||||
if updateExistingEvent {
|
if updateExistingEvent {
|
||||||
event.Count = previousEvent.Count + 1
|
// we still need to copy Name because the Patch relies on the Name to find the target event
|
||||||
event.FirstTimestamp = previousEvent.FirstTimestamp
|
|
||||||
event.Name = previousEvent.Name
|
event.Name = previousEvent.Name
|
||||||
event.ResourceVersion = previousEvent.ResourceVersion
|
event.Count = previousEvent.Count + 1
|
||||||
|
|
||||||
|
// we need to make sure the Count and LastTimestamp are the only differences between event and the eventCopy2
|
||||||
|
eventCopy2 := *event
|
||||||
|
eventCopy2.Count = 0
|
||||||
|
eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
|
||||||
|
newData, _ := json.Marshal(event)
|
||||||
|
oldData, _ := json.Marshal(eventCopy2)
|
||||||
|
patch, _ = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
tries := 0
|
tries := 0
|
||||||
for {
|
for {
|
||||||
if recordEvent(sink, event, updateExistingEvent, eventCache) {
|
if recordEvent(sink, event, patch, updateExistingEvent, eventCache) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
tries++
|
tries++
|
||||||
@ -157,11 +167,11 @@ func isKeyNotFoundError(err error) bool {
|
|||||||
// was successfully recorded or discarded, false if it should be retried.
|
// was successfully recorded or discarded, false if it should be retried.
|
||||||
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
||||||
// existing event.
|
// existing event.
|
||||||
func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool, eventCache *historyCache) bool {
|
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCache *historyCache) bool {
|
||||||
var newEvent *api.Event
|
var newEvent *api.Event
|
||||||
var err error
|
var err error
|
||||||
if updateExistingEvent {
|
if updateExistingEvent {
|
||||||
newEvent, err = sink.Update(event)
|
newEvent, err = sink.Patch(event, patch)
|
||||||
}
|
}
|
||||||
// Update can fail because the event may have been removed and it no longer exists.
|
// Update can fail because the event may have been removed and it no longer exists.
|
||||||
if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
|
if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
|
||||||
@ -232,12 +242,13 @@ func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler fun
|
|||||||
|
|
||||||
// NewRecorder returns an EventRecorder that records events with the given event source.
|
// NewRecorder returns an EventRecorder that records events with the given event source.
|
||||||
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder {
|
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder {
|
||||||
return &recorderImpl{source, eventBroadcaster.Broadcaster}
|
return &recorderImpl{source, eventBroadcaster.Broadcaster, util.RealClock{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
type recorderImpl struct {
|
type recorderImpl struct {
|
||||||
source api.EventSource
|
source api.EventSource
|
||||||
*watch.Broadcaster
|
*watch.Broadcaster
|
||||||
|
clock util.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, reason, message string) {
|
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, reason, message string) {
|
||||||
@ -247,7 +258,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unv
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
event := makeEvent(ref, reason, message)
|
event := recorder.makeEvent(ref, reason, message)
|
||||||
event.Source = recorder.source
|
event.Source = recorder.source
|
||||||
|
|
||||||
recorder.Action(watch.Added, event)
|
recorder.Action(watch.Added, event)
|
||||||
@ -265,8 +276,8 @@ func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp unvers
|
|||||||
recorder.generateEvent(object, timestamp, reason, fmt.Sprintf(messageFmt, args...))
|
recorder.generateEvent(object, timestamp, reason, fmt.Sprintf(messageFmt, args...))
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
|
func (recorder *recorderImpl) makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {
|
||||||
t := unversioned.Now()
|
t := unversioned.Time{recorder.clock.Now()}
|
||||||
namespace := ref.Namespace
|
namespace := ref.Namespace
|
||||||
if namespace == "" {
|
if namespace == "" {
|
||||||
namespace = api.NamespaceDefault
|
namespace = api.NamespaceDefault
|
||||||
|
@ -17,17 +17,20 @@ limitations under the License.
|
|||||||
package record
|
package record
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/strategicpatch"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -38,6 +41,7 @@ func init() {
|
|||||||
type testEventSink struct {
|
type testEventSink struct {
|
||||||
OnCreate func(e *api.Event) (*api.Event, error)
|
OnCreate func(e *api.Event) (*api.Event, error)
|
||||||
OnUpdate func(e *api.Event) (*api.Event, error)
|
OnUpdate func(e *api.Event) (*api.Event, error)
|
||||||
|
OnPatch func(e *api.Event, p []byte) (*api.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateEvent records the event for testing.
|
// CreateEvent records the event for testing.
|
||||||
@ -56,6 +60,50 @@ func (t *testEventSink) Update(e *api.Event) (*api.Event, error) {
|
|||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PatchEvent records the event for testing.
|
||||||
|
func (t *testEventSink) Patch(e *api.Event, p []byte) (*api.Event, error) {
|
||||||
|
if t.OnPatch != nil {
|
||||||
|
return t.OnPatch(e, p)
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type OnCreateFunc func(*api.Event) (*api.Event, error)
|
||||||
|
|
||||||
|
func OnCreateFactory(testCache map[string]*api.Event, createEvent chan<- *api.Event) OnCreateFunc {
|
||||||
|
return func(event *api.Event) (*api.Event, error) {
|
||||||
|
testCache[getEventKey(event)] = event
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type OnPatchFunc func(*api.Event, []byte) (*api.Event, error)
|
||||||
|
|
||||||
|
func OnPatchFactory(testCache map[string]*api.Event, patchEvent chan<- *api.Event) OnPatchFunc {
|
||||||
|
return func(event *api.Event, patch []byte) (*api.Event, error) {
|
||||||
|
cachedEvent, found := testCache[getEventKey(event)]
|
||||||
|
if !found {
|
||||||
|
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache. Try to find Event: %v", event)
|
||||||
|
}
|
||||||
|
originalData, err := json.Marshal(cachedEvent)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
patched, err := strategicpatch.StrategicMergePatch(originalData, patch, event)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
patchedObj := &api.Event{}
|
||||||
|
err = json.Unmarshal(patched, patchedObj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
patchEvent <- patchedObj
|
||||||
|
return patchedObj, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEventf(t *testing.T) {
|
func TestEventf(t *testing.T) {
|
||||||
testPod := &api.Pod{
|
testPod := &api.Pod{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -270,24 +318,26 @@ func TestEventf(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testCache := map[string]*api.Event{}
|
||||||
logCalled := make(chan struct{})
|
logCalled := make(chan struct{})
|
||||||
createEvent := make(chan *api.Event)
|
createEvent := make(chan *api.Event)
|
||||||
updateEvent := make(chan *api.Event)
|
updateEvent := make(chan *api.Event)
|
||||||
|
patchEvent := make(chan *api.Event)
|
||||||
testEvents := testEventSink{
|
testEvents := testEventSink{
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
OnCreate: OnCreateFactory(testCache, createEvent),
|
||||||
createEvent <- event
|
|
||||||
return event, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
updateEvent <- event
|
updateEvent <- event
|
||||||
return event, nil
|
return event, nil
|
||||||
},
|
},
|
||||||
|
OnPatch: OnPatchFactory(testCache, patchEvent),
|
||||||
}
|
}
|
||||||
eventBroadcaster := NewBroadcaster()
|
eventBroadcaster := NewBroadcaster()
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
|
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
clock := &util.FakeClock{time.Now()}
|
||||||
|
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||||
@ -301,7 +351,7 @@ func TestEventf(t *testing.T) {
|
|||||||
|
|
||||||
// validate event
|
// validate event
|
||||||
if item.expectUpdate {
|
if item.expectUpdate {
|
||||||
actualEvent := <-updateEvent
|
actualEvent := <-patchEvent
|
||||||
validateEvent(actualEvent, item.expect, t)
|
validateEvent(actualEvent, item.expect, t)
|
||||||
} else {
|
} else {
|
||||||
actualEvent := <-createEvent
|
actualEvent := <-createEvent
|
||||||
@ -348,6 +398,10 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.
|
|||||||
return actualEvent, nil
|
return actualEvent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBroadcaster, clock util.Clock) EventRecorder {
|
||||||
|
return &recorderImpl{eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteEventError(t *testing.T) {
|
func TestWriteEventError(t *testing.T) {
|
||||||
ref := &api.ObjectReference{
|
ref := &api.ObjectReference{
|
||||||
Kind: "Pod",
|
Kind: "Pod",
|
||||||
@ -412,8 +466,10 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
).Stop()
|
).Stop()
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
clock := &util.FakeClock{time.Now()}
|
||||||
|
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||||
for caseName := range table {
|
for caseName := range table {
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
recorder.Event(ref, "Reason", caseName)
|
recorder.Event(ref, "Reason", caseName)
|
||||||
}
|
}
|
||||||
recorder.Event(ref, "Reason", "finished")
|
recorder.Event(ref, "Reason", "finished")
|
||||||
@ -528,25 +584,27 @@ func TestEventfNoNamespace(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testCache := map[string]*api.Event{}
|
||||||
logCalled := make(chan struct{})
|
logCalled := make(chan struct{})
|
||||||
createEvent := make(chan *api.Event)
|
createEvent := make(chan *api.Event)
|
||||||
updateEvent := make(chan *api.Event)
|
updateEvent := make(chan *api.Event)
|
||||||
|
patchEvent := make(chan *api.Event)
|
||||||
testEvents := testEventSink{
|
testEvents := testEventSink{
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
OnCreate: OnCreateFactory(testCache, createEvent),
|
||||||
createEvent <- event
|
|
||||||
return event, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
updateEvent <- event
|
updateEvent <- event
|
||||||
return event, nil
|
return event, nil
|
||||||
},
|
},
|
||||||
|
OnPatch: OnPatchFactory(testCache, patchEvent),
|
||||||
}
|
}
|
||||||
eventBroadcaster := NewBroadcaster()
|
eventBroadcaster := NewBroadcaster()
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
|
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
clock := &util.FakeClock{time.Now()}
|
||||||
|
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
|
||||||
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
|
||||||
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
|
||||||
@ -560,7 +618,7 @@ func TestEventfNoNamespace(t *testing.T) {
|
|||||||
|
|
||||||
// validate event
|
// validate event
|
||||||
if item.expectUpdate {
|
if item.expectUpdate {
|
||||||
actualEvent := <-updateEvent
|
actualEvent := <-patchEvent
|
||||||
validateEvent(actualEvent, item.expect, t)
|
validateEvent(actualEvent, item.expect, t)
|
||||||
} else {
|
} else {
|
||||||
actualEvent := <-createEvent
|
actualEvent := <-createEvent
|
||||||
@ -787,42 +845,44 @@ func TestMultiSinkCache(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testCache := map[string]*api.Event{}
|
||||||
createEvent := make(chan *api.Event)
|
createEvent := make(chan *api.Event)
|
||||||
updateEvent := make(chan *api.Event)
|
updateEvent := make(chan *api.Event)
|
||||||
|
patchEvent := make(chan *api.Event)
|
||||||
testEvents := testEventSink{
|
testEvents := testEventSink{
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
OnCreate: OnCreateFactory(testCache, createEvent),
|
||||||
createEvent <- event
|
|
||||||
return event, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
updateEvent <- event
|
updateEvent <- event
|
||||||
return event, nil
|
return event, nil
|
||||||
},
|
},
|
||||||
|
OnPatch: OnPatchFactory(testCache, patchEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testCache2 := map[string]*api.Event{}
|
||||||
createEvent2 := make(chan *api.Event)
|
createEvent2 := make(chan *api.Event)
|
||||||
updateEvent2 := make(chan *api.Event)
|
updateEvent2 := make(chan *api.Event)
|
||||||
|
patchEvent2 := make(chan *api.Event)
|
||||||
testEvents2 := testEventSink{
|
testEvents2 := testEventSink{
|
||||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
OnCreate: OnCreateFactory(testCache2, createEvent2),
|
||||||
createEvent2 <- event
|
|
||||||
return event, nil
|
|
||||||
},
|
|
||||||
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
updateEvent2 <- event
|
updateEvent2 <- event
|
||||||
return event, nil
|
return event, nil
|
||||||
},
|
},
|
||||||
|
OnPatch: OnPatchFactory(testCache2, patchEvent2),
|
||||||
}
|
}
|
||||||
|
|
||||||
eventBroadcaster := NewBroadcaster()
|
eventBroadcaster := NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"})
|
clock := &util.FakeClock{time.Now()}
|
||||||
|
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||||
|
|
||||||
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
// validate event
|
// validate event
|
||||||
if item.expectUpdate {
|
if item.expectUpdate {
|
||||||
actualEvent := <-updateEvent
|
actualEvent := <-patchEvent
|
||||||
validateEvent(actualEvent, item.expect, t)
|
validateEvent(actualEvent, item.expect, t)
|
||||||
} else {
|
} else {
|
||||||
actualEvent := <-createEvent
|
actualEvent := <-createEvent
|
||||||
@ -833,11 +893,12 @@ func TestMultiSinkCache(t *testing.T) {
|
|||||||
// Another StartRecordingToSink call should start to record events with new clean cache.
|
// Another StartRecordingToSink call should start to record events with new clean cache.
|
||||||
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
|
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
|
clock.Step(1 * time.Second)
|
||||||
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
|
||||||
|
|
||||||
// validate event
|
// validate event
|
||||||
if item.expectUpdate {
|
if item.expectUpdate {
|
||||||
actualEvent := <-updateEvent2
|
actualEvent := <-patchEvent2
|
||||||
validateEvent(actualEvent, item.expect, t)
|
validateEvent(actualEvent, item.expect, t)
|
||||||
} else {
|
} else {
|
||||||
actualEvent := <-createEvent2
|
actualEvent := <-createEvent2
|
||||||
|
@ -35,6 +35,7 @@ type EventNamespacer interface {
|
|||||||
type EventInterface interface {
|
type EventInterface interface {
|
||||||
Create(event *api.Event) (*api.Event, error)
|
Create(event *api.Event) (*api.Event, error)
|
||||||
Update(event *api.Event) (*api.Event, error)
|
Update(event *api.Event) (*api.Event, error)
|
||||||
|
Patch(event *api.Event, data []byte) (*api.Event, error)
|
||||||
List(label labels.Selector, field fields.Selector) (*api.EventList, error)
|
List(label labels.Selector, field fields.Selector) (*api.EventList, error)
|
||||||
Get(name string) (*api.Event, error)
|
Get(name string) (*api.Event, error)
|
||||||
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
|
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
|
||||||
@ -98,6 +99,22 @@ func (e *events) Update(event *api.Event) (*api.Event, error) {
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Patch modifies an existing event. It returns the copy of the event that the server returns, or an
|
||||||
|
// error. The namespace and name of the target event is deduced from the incompleteEvent. The
|
||||||
|
// namespace must either match this event client's namespace, or this event client must have been
|
||||||
|
// created with the "" namespace.
|
||||||
|
func (e *events) Patch(incompleteEvent *api.Event, data []byte) (*api.Event, error) {
|
||||||
|
result := &api.Event{}
|
||||||
|
err := e.client.Patch(api.StrategicMergePatchType).
|
||||||
|
NamespaceIfScoped(incompleteEvent.Namespace, len(incompleteEvent.Namespace) > 0).
|
||||||
|
Resource("events").
|
||||||
|
Name(incompleteEvent.Name).
|
||||||
|
Body(data).
|
||||||
|
Do().
|
||||||
|
Into(result)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
// List returns a list of events matching the selectors.
|
// List returns a list of events matching the selectors.
|
||||||
func (e *events) List(label labels.Selector, field fields.Selector) (*api.EventList, error) {
|
func (e *events) List(label labels.Selector, field fields.Selector) (*api.EventList, error) {
|
||||||
result := &api.EventList{}
|
result := &api.EventList{}
|
||||||
|
@ -100,6 +100,25 @@ func NewUpdateAction(resource, namespace string, object runtime.Object) UpdateAc
|
|||||||
return action
|
return action
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewRootPatchAction(resource string, object runtime.Object) PatchActionImpl {
|
||||||
|
action := PatchActionImpl{}
|
||||||
|
action.Verb = "patch"
|
||||||
|
action.Resource = resource
|
||||||
|
action.Object = object
|
||||||
|
|
||||||
|
return action
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPatchAction(resource, namespace string, object runtime.Object) PatchActionImpl {
|
||||||
|
action := PatchActionImpl{}
|
||||||
|
action.Verb = "patch"
|
||||||
|
action.Resource = resource
|
||||||
|
action.Namespace = namespace
|
||||||
|
action.Object = object
|
||||||
|
|
||||||
|
return action
|
||||||
|
}
|
||||||
|
|
||||||
func NewUpdateSubresourceAction(resource, subresource, namespace string, object runtime.Object) UpdateActionImpl {
|
func NewUpdateSubresourceAction(resource, subresource, namespace string, object runtime.Object) UpdateActionImpl {
|
||||||
action := UpdateActionImpl{}
|
action := UpdateActionImpl{}
|
||||||
action.Verb = "update"
|
action.Verb = "update"
|
||||||
@ -289,6 +308,15 @@ func (a UpdateActionImpl) GetObject() runtime.Object {
|
|||||||
return a.Object
|
return a.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PatchActionImpl struct {
|
||||||
|
ActionImpl
|
||||||
|
Object runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a PatchActionImpl) GetObject() runtime.Object {
|
||||||
|
return a.Object
|
||||||
|
}
|
||||||
|
|
||||||
type DeleteActionImpl struct {
|
type DeleteActionImpl struct {
|
||||||
ActionImpl
|
ActionImpl
|
||||||
Name string
|
Name string
|
||||||
|
@ -87,6 +87,20 @@ func (c *FakeEvents) Update(event *api.Event) (*api.Event, error) {
|
|||||||
return obj.(*api.Event), err
|
return obj.(*api.Event), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Patch patches an existing event. Returns the copy of the event the server returns, or an error.
|
||||||
|
func (c *FakeEvents) Patch(event *api.Event, data []byte) (*api.Event, error) {
|
||||||
|
action := NewRootPatchAction("events", event)
|
||||||
|
if c.Namespace != "" {
|
||||||
|
action = NewPatchAction("events", c.Namespace, event)
|
||||||
|
}
|
||||||
|
obj, err := c.Fake.Invokes(action, event)
|
||||||
|
if obj == nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj.(*api.Event), err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *FakeEvents) Delete(name string) error {
|
func (c *FakeEvents) Delete(name string) error {
|
||||||
action := NewRootDeleteAction("events", name)
|
action := NewRootDeleteAction("events", name)
|
||||||
if c.Namespace != "" {
|
if c.Namespace != "" {
|
||||||
|
Loading…
Reference in New Issue
Block a user