mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
clean singleton event when calling finishSeries
Signed-off-by: Yassine TIJANI <ytijani@vmware.com>
This commit is contained in:
parent
b49d429f64
commit
b6663e8d4f
@ -38,6 +38,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -67,20 +67,20 @@ type eventBroadcasterImpl struct {
|
|||||||
|
|
||||||
// NewBroadcaster Creates a new event broadcaster.
|
// NewBroadcaster Creates a new event broadcaster.
|
||||||
func NewBroadcaster(sink EventSink) EventBroadcaster {
|
func NewBroadcaster(sink EventSink) EventBroadcaster {
|
||||||
return newBroadcaster(sink, defaultSleepDuration)
|
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
|
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
|
||||||
func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster {
|
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster {
|
||||||
return &eventBroadcasterImpl{
|
return &eventBroadcasterImpl{
|
||||||
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
|
||||||
eventCache: map[eventKey]*v1beta1.Event{},
|
eventCache: eventCache,
|
||||||
sleepDuration: sleepDuration,
|
sleepDuration: sleepDuration,
|
||||||
sink: sink,
|
sink: sink,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add test for refreshExistingEventSeries
|
// refreshExistingEventSeries refresh events TTL
|
||||||
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||||
// TODO: Investigate whether lock contention won't be a problem
|
// TODO: Investigate whether lock contention won't be a problem
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
@ -94,19 +94,23 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add test for finishSeries
|
// finishSeries checks if a series has ended and either:
|
||||||
|
// - write final count to the apiserver
|
||||||
|
// - delete a singleton event (i.e. series field is nil) from the cache
|
||||||
func (e *eventBroadcasterImpl) finishSeries() {
|
func (e *eventBroadcasterImpl) finishSeries() {
|
||||||
// TODO: Investigate whether lock contention won't be a problem
|
// TODO: Investigate whether lock contention won't be a problem
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
for isomorphicKey, event := range e.eventCache {
|
for isomorphicKey, event := range e.eventCache {
|
||||||
eventSeries := event.Series
|
eventSerie := event.Series
|
||||||
if eventSeries != nil {
|
if eventSerie != nil {
|
||||||
if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
|
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
|
||||||
if _, retry := recordEvent(e.sink, event); !retry {
|
if _, retry := recordEvent(e.sink, event); !retry {
|
||||||
delete(e.eventCache, isomorphicKey)
|
delete(e.eventCache, isomorphicKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
|
||||||
|
delete(e.eventCache, isomorphicKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/api/events/v1beta1"
|
"k8s.io/api/events/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
ref "k8s.io/client-go/tools/reference"
|
ref "k8s.io/client-go/tools/reference"
|
||||||
)
|
)
|
||||||
@ -154,7 +155,7 @@ func TestEventSeriesf(t *testing.T) {
|
|||||||
return event, nil
|
return event, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
eventBroadcaster := newBroadcaster(&testEvents, 0)
|
eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{})
|
||||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
|
||||||
eventBroadcaster.StartRecordingToSink(stopCh)
|
eventBroadcaster.StartRecordingToSink(stopCh)
|
||||||
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
|
||||||
@ -206,3 +207,143 @@ func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFinishSeries(t *testing.T) {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
testPod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
SelfLink: "/api/version/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
||||||
|
|
||||||
|
createEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
updateEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
patchEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
testEvents := testEventSeriesSink{
|
||||||
|
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||||
|
// event we receive is already patched, usually the sink uses it
|
||||||
|
// only to retrieve the name and namespace, here we'll use it directly
|
||||||
|
patchEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cache := map[eventKey]*v1beta1.Event{}
|
||||||
|
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||||
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||||
|
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||||
|
nonFinishedEvent := cachedEvent.DeepCopy()
|
||||||
|
nonFinishedEvent.ReportingController = "nonFinished-controller"
|
||||||
|
cachedEvent.Series = &v1beta1.EventSeries{
|
||||||
|
Count: 10,
|
||||||
|
LastObservedTime: LastObservedTime,
|
||||||
|
}
|
||||||
|
cache[getKey(cachedEvent)] = cachedEvent
|
||||||
|
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
|
||||||
|
eventBroadcaster.finishSeries()
|
||||||
|
select {
|
||||||
|
case actualEvent := <-patchEvent:
|
||||||
|
t.Logf("validating event affected by patch request")
|
||||||
|
eventBroadcaster.mu.Lock()
|
||||||
|
defer eventBroadcaster.mu.Unlock()
|
||||||
|
if len(cache) != 1 {
|
||||||
|
t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
|
||||||
|
}
|
||||||
|
if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
|
||||||
|
t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
|
||||||
|
}
|
||||||
|
// check that we emitted only one event
|
||||||
|
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||||
|
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRefreshExistingEventSeries(t *testing.T) {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
testPod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
SelfLink: "/api/version/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
||||||
|
|
||||||
|
createEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
updateEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
patchEvent := make(chan *v1beta1.Event, 10)
|
||||||
|
testEvents := testEventSeriesSink{
|
||||||
|
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
createEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||||
|
updateEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||||
|
// event we receive is already patched, usually the sink uses it
|
||||||
|
//only to retrieve the name and namespace, here we'll use it directly.
|
||||||
|
patchEvent <- event
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cache := map[eventKey]*v1beta1.Event{}
|
||||||
|
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||||
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||||
|
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||||
|
cachedEvent.Series = &v1beta1.EventSeries{
|
||||||
|
Count: 10,
|
||||||
|
LastObservedTime: LastObservedTime,
|
||||||
|
}
|
||||||
|
cacheKey := getKey(cachedEvent)
|
||||||
|
cache[cacheKey] = cachedEvent
|
||||||
|
|
||||||
|
eventBroadcaster.refreshExistingEventSeries()
|
||||||
|
select {
|
||||||
|
case <-patchEvent:
|
||||||
|
t.Logf("validating event affected by patch request")
|
||||||
|
eventBroadcaster.mu.Lock()
|
||||||
|
defer eventBroadcaster.mu.Unlock()
|
||||||
|
if len(cache) != 1 {
|
||||||
|
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
|
||||||
|
}
|
||||||
|
// check that we emitted only one event
|
||||||
|
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||||
|
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user