mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Merge pull request #81780 from yastij/fix-nil-panic
check that the recorded event is not nil on refreshExistingEventSeries
This commit is contained in:
commit
ab7c4a7c37
@ -42,6 +42,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -111,7 +111,9 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||
for isomorphicKey, event := range e.eventCache {
|
||||
if event.Series != nil {
|
||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||
e.eventCache[isomorphicKey] = recordedEvent
|
||||
if recordedEvent != nil {
|
||||
e.eventCache[isomorphicKey] = recordedEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
k8sruntime "k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
ref "k8s.io/client-go/tools/reference"
|
||||
)
|
||||
|
||||
@ -299,51 +300,72 @@ func TestRefreshExistingEventSeries(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
|
||||
|
||||
createEvent := make(chan *v1beta1.Event, 10)
|
||||
updateEvent := make(chan *v1beta1.Event, 10)
|
||||
patchEvent := make(chan *v1beta1.Event, 10)
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it
|
||||
//only to retrieve the name and namespace, here we'll use it directly.
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
cache := map[eventKey]*v1beta1.Event{}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||
cachedEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 10,
|
||||
LastObservedTime: LastObservedTime,
|
||||
}
|
||||
cacheKey := getKey(cachedEvent)
|
||||
cache[cacheKey] = cachedEvent
|
||||
|
||||
eventBroadcaster.refreshExistingEventSeries()
|
||||
select {
|
||||
case <-patchEvent:
|
||||
t.Logf("validating event affected by patch request")
|
||||
eventBroadcaster.mu.Lock()
|
||||
defer eventBroadcaster.mu.Unlock()
|
||||
if len(cache) != 1 {
|
||||
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
|
||||
table := []struct {
|
||||
patchFunc func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error)
|
||||
}{
|
||||
{
|
||||
patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// event we receive is already patched, usually the sink uses it
|
||||
//only to retrieve the name and namespace, here we'll use it directly.
|
||||
patchEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) {
|
||||
// we simulate an apiserver error here
|
||||
patchEvent <- nil
|
||||
return nil, &restclient.RequestConstructionError{}
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, item := range table {
|
||||
testEvents := testEventSeriesSink{
|
||||
OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
createEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) {
|
||||
updateEvent <- event
|
||||
return event, nil
|
||||
},
|
||||
OnPatch: item.patchFunc,
|
||||
}
|
||||
// check that we emitted only one event
|
||||
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||
cache := map[eventKey]*v1beta1.Event{}
|
||||
eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl)
|
||||
cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
|
||||
cachedEvent.Series = &v1beta1.EventSeries{
|
||||
Count: 10,
|
||||
LastObservedTime: LastObservedTime,
|
||||
}
|
||||
cacheKey := getKey(cachedEvent)
|
||||
cache[cacheKey] = cachedEvent
|
||||
|
||||
eventBroadcaster.refreshExistingEventSeries()
|
||||
select {
|
||||
case <-patchEvent:
|
||||
t.Logf("validating event affected by patch request")
|
||||
eventBroadcaster.mu.Lock()
|
||||
defer eventBroadcaster.mu.Unlock()
|
||||
if len(cache) != 1 {
|
||||
t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
|
||||
}
|
||||
// check that we emitted only one event
|
||||
if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
|
||||
t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
|
||||
}
|
||||
cacheEvent, exists := cache[cacheKey]
|
||||
|
||||
if cacheEvent == nil || !exists {
|
||||
t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user