mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Improve client recording of events such that clients are
(1) less likely to drop events if the master is unavailable (2) less likely to have goroutines block while trying to record an event. Done as part of #3163 to ensure that minions operate well even while the master is down.
This commit is contained in:
parent
3ca6c231b2
commit
702a6f96b4
@ -18,6 +18,7 @@ package record
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@ -25,13 +26,22 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// retryEventSleep is the time between record failures to retry. Available for test alteration.
|
||||
var retryEventSleep = 1 * time.Second
|
||||
const (
|
||||
maxQueuedEvents = 1000
|
||||
maxTriesPerEvent = 10
|
||||
)
|
||||
|
||||
var (
|
||||
minSleep = float64(1 * time.Second)
|
||||
maxSleep = float64(15 * time.Second)
|
||||
backoffExp = 1.5
|
||||
)
|
||||
|
||||
// EventRecorder knows how to store events (client.Client implements it.)
|
||||
// EventRecorder must respect the namespace that will be embedded in 'event'.
|
||||
@ -46,49 +56,82 @@ type EventRecorder interface {
|
||||
// or used to stop recording, if desired.
|
||||
// TODO: make me an object with parameterizable queue length and retry interval
|
||||
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
|
||||
// Set up our own personal buffer of events so that we can clear out GetEvents'
|
||||
// broadcast channel as quickly as possible to avoid causing the relatively more
|
||||
// important event-producing goroutines from blocking while trying to insert events.
|
||||
eventQueue := make(chan *api.Event, maxQueuedEvents)
|
||||
|
||||
// Run a function in the background that grabs events off the queue and tries
|
||||
// to record them, retrying as appropriate to try to avoid dropping any.
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
for event := range eventQueue {
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(recorder, event) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
sleepDuration := time.Duration(
|
||||
math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1))))
|
||||
time.Sleep(wait.Jitter(sleepDuration, 0.5))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Finally, kick off the watcher that takes events from the channel and puts them
|
||||
// onto the queue.
|
||||
return GetEvents(func(event *api.Event) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
event.Source = source
|
||||
try := 0
|
||||
for {
|
||||
try++
|
||||
_, err := recorder.Create(event)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
// If we can't contact the server, then hold everything while we keep trying.
|
||||
// Otherwise, something about the event is malformed and we should abandon it.
|
||||
giveUp := false
|
||||
switch err.(type) {
|
||||
case *client.RequestConstructionError:
|
||||
// We will construct the request the same next time, so don't keep trying.
|
||||
giveUp = true
|
||||
case *errors.StatusError:
|
||||
// This indicates that the server understood and rejected our request.
|
||||
giveUp = true
|
||||
case *errors.UnexpectedObjectError:
|
||||
// We don't expect this; it implies the server's response didn't match a
|
||||
// known pattern. Go ahead and retry.
|
||||
default:
|
||||
// This case includes actual http transport errors. Go ahead and retry.
|
||||
}
|
||||
if giveUp {
|
||||
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
|
||||
break
|
||||
}
|
||||
if try >= 3 {
|
||||
glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err)
|
||||
break
|
||||
}
|
||||
glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err)
|
||||
time.Sleep(retryEventSleep)
|
||||
// Drop new events rather than old ones because the old ones may contain
|
||||
// some information explaining why everything is so backed up.
|
||||
if len(eventQueue) == maxQueuedEvents {
|
||||
glog.Errorf("Unable to write event '%#v' (event buffer full!)", event)
|
||||
} else {
|
||||
eventQueue <- event
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// recordEvent attempts to write event to recorder. It returns true if the event
|
||||
// was successfully recorded or discarded, false if it should be retried.
|
||||
func recordEvent(recorder EventRecorder, event *api.Event) bool {
|
||||
_, err := recorder.Create(event)
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
// If we can't contact the server, then hold everything while we keep trying.
|
||||
// Otherwise, something about the event is malformed and we should abandon it.
|
||||
giveUp := false
|
||||
switch err.(type) {
|
||||
case *client.RequestConstructionError:
|
||||
// We will construct the request the same next time, so don't keep trying.
|
||||
giveUp = true
|
||||
case *errors.StatusError:
|
||||
// This indicates that the server understood and rejected our request.
|
||||
giveUp = true
|
||||
case *errors.UnexpectedObjectError:
|
||||
// We don't expect this; it implies the server's response didn't match a
|
||||
// known pattern. Go ahead and retry.
|
||||
default:
|
||||
// This case includes actual http transport errors. Go ahead and retry.
|
||||
}
|
||||
if giveUp {
|
||||
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
|
||||
return true
|
||||
}
|
||||
glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// StartLogging just logs local events, using the given logging function. The
|
||||
// return value can be ignored or used to stop logging, if desired.
|
||||
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
|
||||
|
@ -19,6 +19,7 @@ package record
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -31,7 +32,9 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
retryEventSleep = 1 * time.Microsecond
|
||||
// Don't bother sleeping between retries.
|
||||
minSleep = 0
|
||||
maxSleep = 0
|
||||
}
|
||||
|
||||
type testEventRecorder struct {
|
||||
@ -192,12 +195,12 @@ func TestWriteEventError(t *testing.T) {
|
||||
},
|
||||
"retry1": {
|
||||
timesToSendError: 1000,
|
||||
attemptsWanted: 3,
|
||||
attemptsWanted: 10,
|
||||
err: &errors.UnexpectedObjectError{},
|
||||
},
|
||||
"retry2": {
|
||||
timesToSendError: 1000,
|
||||
attemptsWanted: 3,
|
||||
attemptsWanted: 10,
|
||||
err: fmt.Errorf("A weird error"),
|
||||
},
|
||||
"succeedEventually": {
|
||||
@ -242,3 +245,54 @@ func TestWriteEventError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLotsOfEvents(t *testing.T) {
|
||||
recorderCalled := make(chan struct{})
|
||||
loggerCalled := make(chan struct{})
|
||||
|
||||
// Fail each event a few times to ensure there's some load on the tested code.
|
||||
var counts [1000]int
|
||||
testEvents := testEventRecorder{
|
||||
OnEvent: func(event *api.Event) (*api.Event, error) {
|
||||
num, err := strconv.Atoi(event.Message)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return event, nil
|
||||
}
|
||||
counts[num]++
|
||||
if counts[num] < 5 {
|
||||
return nil, fmt.Errorf("fake error")
|
||||
}
|
||||
recorderCalled <- struct{}{}
|
||||
return event, nil
|
||||
},
|
||||
}
|
||||
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
|
||||
logger := StartLogging(func(formatter string, args ...interface{}) {
|
||||
loggerCalled <- struct{}{}
|
||||
})
|
||||
|
||||
ref := &api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "v1beta1",
|
||||
}
|
||||
for i := 0; i < maxQueuedEvents; i++ {
|
||||
go Event(ref, "Status", "Reason", strconv.Itoa(i))
|
||||
}
|
||||
// Make sure no events were dropped by either of the listeners.
|
||||
for i := 0; i < maxQueuedEvents; i++ {
|
||||
<-recorderCalled
|
||||
<-loggerCalled
|
||||
}
|
||||
// Make sure that every event was attempted 5 times
|
||||
for i := 0; i < maxQueuedEvents; i++ {
|
||||
if counts[i] < 5 {
|
||||
t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i])
|
||||
}
|
||||
}
|
||||
recorder.Stop()
|
||||
logger.Stop()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user