Merge pull request #78037 from yastij/fix-event-cleanup

clean singleton event when calling finishSeries

Kubernetes-commit: b0a81349effc68e19a6e063c8b7ebe0a4c8774e3
This commit is contained in:
Kubernetes Publisher 2019-05-29 21:28:56 -07:00
commit 5cefd29505
5 changed files with 162 additions and 17 deletions

4
Godeps/Godeps.json generated
View File

@ -188,11 +188,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "cf720200d547"
"Rev": "96378037bc4d"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "bff0c42413f8"
"Rev": "65c6e8495981"
},
{
"ImportPath": "k8s.io/klog",

8
go.mod
View File

@ -26,8 +26,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a
golang.org/x/time v0.0.0-20161028155119-f51c12702a4d
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.0.0-20190531132107-cf720200d547
k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8
k8s.io/api v0.0.0-20190529212817-96378037bc4d
k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981
k8s.io/klog v0.3.1
k8s.io/utils v0.0.0-20190221042446-c2654d5206da
sigs.k8s.io/yaml v1.1.0
@ -37,6 +37,6 @@ replace (
golang.org/x/sync => golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys => golang.org/x/sys v0.0.0-20190209173611-3b5209105503
golang.org/x/tools => golang.org/x/tools v0.0.0-20190313210603-aa82965741a9
k8s.io/api => k8s.io/api v0.0.0-20190531132107-cf720200d547
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8
k8s.io/api => k8s.io/api v0.0.0-20190529212817-96378037bc4d
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981
)

4
go.sum
View File

@ -91,8 +91,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
k8s.io/api v0.0.0-20190531132107-cf720200d547/go.mod h1:QMoKrFuqE2O+M+xCzcomo1EH5b0DgFDaKvisrkBaMfk=
k8s.io/apimachinery v0.0.0-20190529005236-bff0c42413f8/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
k8s.io/api v0.0.0-20190529212817-96378037bc4d/go.mod h1:QTFMweSwWjQ8mUX2YYyJaX0B/iQR1UBvNgLXG21bT5c=
k8s.io/apimachinery v0.0.0-20190531131812-65c6e8495981/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
k8s.io/klog v0.3.1 h1:RVgyDHY/kFKtLqh67NvEWIgkMneNoIrdkN0CxDSQc68=
k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86FIDppkbrEXdXlxU3a3BMI=

View File

@ -90,20 +90,20 @@ func (e *EventSinkImpl) Patch(event *v1beta1.Event, data []byte) (*v1beta1.Event
// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
return newBroadcaster(sink, defaultSleepDuration)
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{})
}
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaster {
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
eventCache: map[eventKey]*v1beta1.Event{},
eventCache: eventCache,
sleepDuration: sleepDuration,
sink: sink,
}
}
// TODO: add test for refreshExistingEventSeries
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
@ -117,19 +117,23 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
}
}
// TODO: add test for finishSeries
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSeries := event.Series
if eventSeries != nil {
if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
delete(e.eventCache, isomorphicKey)
}
}
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/api/events/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
)
@ -154,7 +155,7 @@ func TestEventSeriesf(t *testing.T) {
return event, nil
},
}
eventBroadcaster := newBroadcaster(&testEvents, 0)
eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
eventBroadcaster.StartRecordingToSink(stopCh)
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
@ -206,3 +207,143 @@ func validateEventSerie(messagePrefix string, expectedUpdate bool, actualEvent *
}
}
func TestFinishSeries(t *testing.T) {
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
if err != nil {
t.Fatal(err)
}
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
if err != nil {
t.Fatal(err)
}
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
createEvent := make(chan *v1beta1.Event, 10)
updateEvent := make(chan *v1beta1.Event, 10)
patchEvent := make(chan *v1beta1.Event, 10)
testEvents := testEventSeriesSink{
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
createEvent <- event
return event, nil
},
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
// event we receive is already patched, usually the sink uses it
// only to retrieve the name and namespace, here we'll use it directly
patchEvent <- event
return event, nil
},
}
cache := map[eventKey]*v1beta1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
nonFinishedEvent := cachedEvent.DeepCopy()
nonFinishedEvent.ReportingController = "nonFinished-controller"
cachedEvent.Series = &v1beta1.EventSeries{
Count: 10,
LastObservedTime: LastObservedTime,
}
cache[getKey(cachedEvent)] = cachedEvent
cache[getKey(nonFinishedEvent)] = nonFinishedEvent
eventBroadcaster.finishSeries()
select {
case actualEvent := <-patchEvent:
t.Logf("validating event affected by patch request")
eventBroadcaster.mu.Lock()
defer eventBroadcaster.mu.Unlock()
if len(cache) != 1 {
t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
}
if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
}
// check that we emitted only one event
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}
func TestRefreshExistingEventSeries(t *testing.T) {
hostname, _ := os.Hostname()
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
}
regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
if err != nil {
t.Fatal(err)
}
related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
if err != nil {
t.Fatal(err)
}
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
createEvent := make(chan *v1beta1.Event, 10)
updateEvent := make(chan *v1beta1.Event, 10)
patchEvent := make(chan *v1beta1.Event, 10)
testEvents := testEventSeriesSink{
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
createEvent <- event
return event, nil
},
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
updateEvent <- event
return event, nil
},
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
// event we receive is already patched, usually the sink uses it
//only to retrieve the name and namespace, here we'll use it directly.
patchEvent <- event
return event, nil
},
}
cache := map[eventKey]*v1beta1.Event{}
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
cachedEvent.Series = &v1beta1.EventSeries{
Count: 10,
LastObservedTime: LastObservedTime,
}
cacheKey := getKey(cachedEvent)
cache[cacheKey] = cachedEvent
eventBroadcaster.refreshExistingEventSeries()
select {
case <-patchEvent:
t.Logf("validating event affected by patch request")
eventBroadcaster.mu.Lock()
defer eventBroadcaster.mu.Unlock()
if len(cache) != 1 {
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
}
// check that we emitted only one event
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}