mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-22 05:10:32 +00:00
Use lock in eventBroadcasterImpl#refreshExistingEventSeries
Kubernetes-commit: 32241b0751c9b9bd5d061eae9a42bd88970d8478
This commit is contained in:
parent
7fc601088d
commit
c3d4ca8db6
@ -83,8 +83,8 @@ func newBroadcaster(sink EventSink, sleepDuration time.Duration) EventBroadcaste
|
|||||||
// TODO: add test for refreshExistingEventSeries
|
// TODO: add test for refreshExistingEventSeries
|
||||||
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
|
||||||
// TODO: Investigate whether lock contention won't be a problem
|
// TODO: Investigate whether lock contention won't be a problem
|
||||||
e.mu.RLock()
|
e.mu.Lock()
|
||||||
defer e.mu.RUnlock()
|
defer e.mu.Unlock()
|
||||||
for isomorphicKey, event := range e.eventCache {
|
for isomorphicKey, event := range e.eventCache {
|
||||||
if event.Series != nil {
|
if event.Series != nil {
|
||||||
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
|
||||||
@ -100,9 +100,9 @@ func (e *eventBroadcasterImpl) finishSeries() {
|
|||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
for isomorphicKey, event := range e.eventCache {
|
for isomorphicKey, event := range e.eventCache {
|
||||||
eventSerie := event.Series
|
eventSeries := event.Series
|
||||||
if eventSerie != nil {
|
if eventSeries != nil {
|
||||||
if eventSerie.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
|
if eventSeries.LastObservedTime.Time.Add(finishTime).Before(time.Now()) {
|
||||||
if _, retry := recordEvent(e.sink, event); !retry {
|
if _, retry := recordEvent(e.sink, event); !retry {
|
||||||
delete(e.eventCache, isomorphicKey)
|
delete(e.eventCache, isomorphicKey)
|
||||||
}
|
}
|
||||||
@ -174,8 +174,8 @@ func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.E
|
|||||||
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
||||||
var newEvent *v1beta1.Event
|
var newEvent *v1beta1.Event
|
||||||
var err error
|
var err error
|
||||||
isEventSerie := event.Series != nil
|
isEventSeries := event.Series != nil
|
||||||
if isEventSerie {
|
if isEventSeries {
|
||||||
patch, err := createPatchBytesForSeries(event)
|
patch, err := createPatchBytesForSeries(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
|
klog.Errorf("Unable to calculate diff, no merge is possible: %v", err)
|
||||||
@ -184,7 +184,7 @@ func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) {
|
|||||||
newEvent, err = sink.Patch(event, patch)
|
newEvent, err = sink.Patch(event, patch)
|
||||||
}
|
}
|
||||||
// Update can fail because the event may have been removed and it no longer exists.
|
// Update can fail because the event may have been removed and it no longer exists.
|
||||||
if !isEventSerie || (isEventSerie && util.IsKeyNotFoundError(err)) {
|
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
|
||||||
// Making sure that ResourceVersion is empty on creation
|
// Making sure that ResourceVersion is empty on creation
|
||||||
event.ResourceVersion = ""
|
event.ResourceVersion = ""
|
||||||
newEvent, err = sink.Create(event)
|
newEvent, err = sink.Create(event)
|
||||||
|
Loading…
Reference in New Issue
Block a user