mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Merge pull request #3381 from a-robinson/events2
Improve client recording of events such that clients are
This commit is contained in:
commit
2bee4ac572
@ -18,6 +18,7 @@ package record
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -30,8 +31,9 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// retryEventSleep is the time between record failures to retry. Available for test alteration.
|
const maxTriesPerEvent = 12
|
||||||
var retryEventSleep = 1 * time.Second
|
|
||||||
|
var sleepDuration = 10 * time.Second
|
||||||
|
|
||||||
// EventRecorder knows how to store events (client.Client implements it.)
|
// EventRecorder knows how to store events (client.Client implements it.)
|
||||||
// EventRecorder must respect the namespace that will be embedded in 'event'.
|
// EventRecorder must respect the namespace that will be embedded in 'event'.
|
||||||
@ -46,49 +48,68 @@ type EventRecorder interface {
|
|||||||
// or used to stop recording, if desired.
|
// or used to stop recording, if desired.
|
||||||
// TODO: make me an object with parameterizable queue length and retry interval
|
// TODO: make me an object with parameterizable queue length and retry interval
|
||||||
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
|
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
|
||||||
|
// The default math/rand package functions aren't thread safe, so create a
|
||||||
|
// new Rand object for each StartRecording call.
|
||||||
|
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
return GetEvents(func(event *api.Event) {
|
return GetEvents(func(event *api.Event) {
|
||||||
// Make a copy before modification, because there could be multiple listeners.
|
// Make a copy before modification, because there could be multiple listeners.
|
||||||
// Events are safe to copy like this.
|
// Events are safe to copy like this.
|
||||||
eventCopy := *event
|
eventCopy := *event
|
||||||
event = &eventCopy
|
event = &eventCopy
|
||||||
event.Source = source
|
event.Source = source
|
||||||
try := 0
|
|
||||||
|
tries := 0
|
||||||
for {
|
for {
|
||||||
try++
|
if recordEvent(recorder, event) {
|
||||||
_, err := recorder.Create(event)
|
|
||||||
if err == nil {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// If we can't contact the server, then hold everything while we keep trying.
|
tries++
|
||||||
// Otherwise, something about the event is malformed and we should abandon it.
|
if tries >= maxTriesPerEvent {
|
||||||
giveUp := false
|
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||||
switch err.(type) {
|
|
||||||
case *client.RequestConstructionError:
|
|
||||||
// We will construct the request the same next time, so don't keep trying.
|
|
||||||
giveUp = true
|
|
||||||
case *errors.StatusError:
|
|
||||||
// This indicates that the server understood and rejected our request.
|
|
||||||
giveUp = true
|
|
||||||
case *errors.UnexpectedObjectError:
|
|
||||||
// We don't expect this; it implies the server's response didn't match a
|
|
||||||
// known pattern. Go ahead and retry.
|
|
||||||
default:
|
|
||||||
// This case includes actual http transport errors. Go ahead and retry.
|
|
||||||
}
|
|
||||||
if giveUp {
|
|
||||||
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if try >= 3 {
|
// Randomize the first sleep so that various clients won't all be
|
||||||
glog.Errorf("Unable to write event '%#v': '%v' (retry limit exceeded!)", event, err)
|
// synced up if the master goes down.
|
||||||
break
|
if tries == 1 {
|
||||||
|
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||||
|
} else {
|
||||||
|
time.Sleep(sleepDuration)
|
||||||
}
|
}
|
||||||
glog.Errorf("Unable to write event: '%v' (will retry in 1 second)", err)
|
|
||||||
time.Sleep(retryEventSleep)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// recordEvent attempts to write event to recorder. It returns true if the event
|
||||||
|
// was successfully recorded or discarded, false if it should be retried.
|
||||||
|
func recordEvent(recorder EventRecorder, event *api.Event) bool {
|
||||||
|
_, err := recorder.Create(event)
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// If we can't contact the server, then hold everything while we keep trying.
|
||||||
|
// Otherwise, something about the event is malformed and we should abandon it.
|
||||||
|
giveUp := false
|
||||||
|
switch err.(type) {
|
||||||
|
case *client.RequestConstructionError:
|
||||||
|
// We will construct the request the same next time, so don't keep trying.
|
||||||
|
giveUp = true
|
||||||
|
case *errors.StatusError:
|
||||||
|
// This indicates that the server understood and rejected our request.
|
||||||
|
giveUp = true
|
||||||
|
case *errors.UnexpectedObjectError:
|
||||||
|
// We don't expect this; it implies the server's response didn't match a
|
||||||
|
// known pattern. Go ahead and retry.
|
||||||
|
default:
|
||||||
|
// This case includes actual http transport errors. Go ahead and retry.
|
||||||
|
}
|
||||||
|
if giveUp {
|
||||||
|
glog.Errorf("Unable to write event '%#v': '%v' (will not retry!)", event, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// StartLogging just logs local events, using the given logging function. The
|
// StartLogging just logs local events, using the given logging function. The
|
||||||
// return value can be ignored or used to stop logging, if desired.
|
// return value can be ignored or used to stop logging, if desired.
|
||||||
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
|
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
|
||||||
@ -120,9 +141,9 @@ func GetEvents(f func(*api.Event)) watch.Interface {
|
|||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
const queueLen = 1000
|
const maxQueuedEvents = 1000
|
||||||
|
|
||||||
var events = watch.NewBroadcaster(queueLen)
|
var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
|
||||||
|
|
||||||
// Event constructs an event from the given information and puts it in the queue for sending.
|
// Event constructs an event from the given information and puts it in the queue for sending.
|
||||||
// 'object' is the object this event is about. Event will make a reference-- or you may also
|
// 'object' is the object this event is about. Event will make a reference-- or you may also
|
||||||
|
@ -19,9 +19,9 @@ package record
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
@ -31,7 +31,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
retryEventSleep = 1 * time.Microsecond
|
// Don't bother sleeping between retries.
|
||||||
|
sleepDuration = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type testEventRecorder struct {
|
type testEventRecorder struct {
|
||||||
@ -188,12 +189,12 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
"retry1": {
|
"retry1": {
|
||||||
timesToSendError: 1000,
|
timesToSendError: 1000,
|
||||||
attemptsWanted: 3,
|
attemptsWanted: 12,
|
||||||
err: &errors.UnexpectedObjectError{},
|
err: &errors.UnexpectedObjectError{},
|
||||||
},
|
},
|
||||||
"retry2": {
|
"retry2": {
|
||||||
timesToSendError: 1000,
|
timesToSendError: 1000,
|
||||||
attemptsWanted: 3,
|
attemptsWanted: 12,
|
||||||
err: fmt.Errorf("A weird error"),
|
err: fmt.Errorf("A weird error"),
|
||||||
},
|
},
|
||||||
"succeedEventually": {
|
"succeedEventually": {
|
||||||
@ -238,3 +239,54 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLotsOfEvents(t *testing.T) {
|
||||||
|
recorderCalled := make(chan struct{})
|
||||||
|
loggerCalled := make(chan struct{})
|
||||||
|
|
||||||
|
// Fail each event a few times to ensure there's some load on the tested code.
|
||||||
|
var counts [1000]int
|
||||||
|
testEvents := testEventRecorder{
|
||||||
|
OnEvent: func(event *api.Event) (*api.Event, error) {
|
||||||
|
num, err := strconv.Atoi(event.Message)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
counts[num]++
|
||||||
|
if counts[num] < 5 {
|
||||||
|
return nil, fmt.Errorf("fake error")
|
||||||
|
}
|
||||||
|
recorderCalled <- struct{}{}
|
||||||
|
return event, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
|
||||||
|
logger := StartLogging(func(formatter string, args ...interface{}) {
|
||||||
|
loggerCalled <- struct{}{}
|
||||||
|
})
|
||||||
|
|
||||||
|
ref := &api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
}
|
||||||
|
for i := 0; i < maxQueuedEvents; i++ {
|
||||||
|
go Event(ref, "Status", "Reason", strconv.Itoa(i))
|
||||||
|
}
|
||||||
|
// Make sure no events were dropped by either of the listeners.
|
||||||
|
for i := 0; i < maxQueuedEvents; i++ {
|
||||||
|
<-recorderCalled
|
||||||
|
<-loggerCalled
|
||||||
|
}
|
||||||
|
// Make sure that every event was attempted 5 times
|
||||||
|
for i := 0; i < maxQueuedEvents; i++ {
|
||||||
|
if counts[i] < 5 {
|
||||||
|
t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
recorder.Stop()
|
||||||
|
logger.Stop()
|
||||||
|
}
|
||||||
|
@ -39,7 +39,7 @@ type GenericRegistry struct {
|
|||||||
func NewGeneric(list runtime.Object) *GenericRegistry {
|
func NewGeneric(list runtime.Object) *GenericRegistry {
|
||||||
return &GenericRegistry{
|
return &GenericRegistry{
|
||||||
ObjectList: list,
|
ObjectList: list,
|
||||||
Broadcaster: watch.NewBroadcaster(0),
|
Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ type PodRegistry struct {
|
|||||||
func NewPodRegistry(pods *api.PodList) *PodRegistry {
|
func NewPodRegistry(pods *api.PodList) *PodRegistry {
|
||||||
return &PodRegistry{
|
return &PodRegistry{
|
||||||
Pods: pods,
|
Pods: pods,
|
||||||
broadcaster: watch.NewBroadcaster(0),
|
broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,6 +22,20 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
|
||||||
|
// channel is full.
|
||||||
|
type FullChannelBehavior int
|
||||||
|
|
||||||
|
const (
|
||||||
|
WaitIfChannelFull FullChannelBehavior = iota
|
||||||
|
DropIfChannelFull
|
||||||
|
)
|
||||||
|
|
||||||
|
// Buffer the incoming queue a little bit even though it should rarely ever accumulate
|
||||||
|
// anything, just in case a few events are received in such a short window that
|
||||||
|
// Broadcaster can't move them onto the watchers' queues fast enough.
|
||||||
|
const incomingQueueLength = 25
|
||||||
|
|
||||||
// Broadcaster distributes event notifications among any number of watchers. Every event
|
// Broadcaster distributes event notifications among any number of watchers. Every event
|
||||||
// is delivered to every watcher.
|
// is delivered to every watcher.
|
||||||
type Broadcaster struct {
|
type Broadcaster struct {
|
||||||
@ -31,17 +45,27 @@ type Broadcaster struct {
|
|||||||
nextWatcher int64
|
nextWatcher int64
|
||||||
|
|
||||||
incoming chan Event
|
incoming chan Event
|
||||||
|
|
||||||
|
// How large to make watcher's channel.
|
||||||
|
watchQueueLength int
|
||||||
|
// If one of the watch channels is full, don't wait for it to become empty.
|
||||||
|
// Instead just deliver it to the watchers that do have space in their
|
||||||
|
// channels and move on to the next event.
|
||||||
|
// It's more fair to do this on a per-watcher basis than to do it on the
|
||||||
|
// "incoming" channel, which would allow one slow watcher to prevent all
|
||||||
|
// other watchers from getting new events.
|
||||||
|
fullChannelBehavior FullChannelBehavior
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue.
|
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
|
||||||
// When queueLength is 0, Action will block until any prior event has been
|
// It is guaranteed that events will be distibuted in the order in which they ocur,
|
||||||
// completely distributed. It is guaranteed that events will be distibuted in the
|
// but the order in which a single event is distributed among all of the watchers is unspecified.
|
||||||
// order in which they ocurr, but the order in which a single event is distributed
|
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
|
||||||
// among all of the watchers is unspecified.
|
|
||||||
func NewBroadcaster(queueLength int) *Broadcaster {
|
|
||||||
m := &Broadcaster{
|
m := &Broadcaster{
|
||||||
watchers: map[int64]*broadcasterWatcher{},
|
watchers: map[int64]*broadcasterWatcher{},
|
||||||
incoming: make(chan Event, queueLength),
|
incoming: make(chan Event, incomingQueueLength),
|
||||||
|
watchQueueLength: queueLength,
|
||||||
|
fullChannelBehavior: fullChannelBehavior,
|
||||||
}
|
}
|
||||||
go m.loop()
|
go m.loop()
|
||||||
return m
|
return m
|
||||||
@ -56,7 +80,7 @@ func (m *Broadcaster) Watch() Interface {
|
|||||||
id := m.nextWatcher
|
id := m.nextWatcher
|
||||||
m.nextWatcher++
|
m.nextWatcher++
|
||||||
w := &broadcasterWatcher{
|
w := &broadcasterWatcher{
|
||||||
result: make(chan Event),
|
result: make(chan Event, m.watchQueueLength),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
id: id,
|
id: id,
|
||||||
m: m,
|
m: m,
|
||||||
@ -119,10 +143,20 @@ func (m *Broadcaster) loop() {
|
|||||||
func (m *Broadcaster) distribute(event Event) {
|
func (m *Broadcaster) distribute(event Event) {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
for _, w := range m.watchers {
|
if m.fullChannelBehavior == DropIfChannelFull {
|
||||||
select {
|
for _, w := range m.watchers {
|
||||||
case w.result <- event:
|
select {
|
||||||
case <-w.stopped:
|
case w.result <- event:
|
||||||
|
case <-w.stopped:
|
||||||
|
default: // Don't block if the event can't be queued.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, w := range m.watchers {
|
||||||
|
select {
|
||||||
|
case w.result <- event:
|
||||||
|
case <-w.stopped:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ func TestBroadcaster(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The broadcaster we're testing
|
// The broadcaster we're testing
|
||||||
m := NewBroadcaster(0)
|
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||||
|
|
||||||
// Add a bunch of watchers
|
// Add a bunch of watchers
|
||||||
const testWatchers = 2
|
const testWatchers = 2
|
||||||
@ -77,7 +77,7 @@ func TestBroadcaster(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBroadcasterWatcherClose(t *testing.T) {
|
func TestBroadcasterWatcherClose(t *testing.T) {
|
||||||
m := NewBroadcaster(0)
|
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||||
w := m.Watch()
|
w := m.Watch()
|
||||||
w2 := m.Watch()
|
w2 := m.Watch()
|
||||||
w.Stop()
|
w.Stop()
|
||||||
@ -95,7 +95,7 @@ func TestBroadcasterWatcherClose(t *testing.T) {
|
|||||||
|
|
||||||
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
m := NewBroadcaster(0)
|
m := NewBroadcaster(0, WaitIfChannelFull)
|
||||||
go func(w0, w1 Interface) {
|
go func(w0, w1 Interface) {
|
||||||
// We know Broadcaster is in the distribute loop once one watcher receives
|
// We know Broadcaster is in the distribute loop once one watcher receives
|
||||||
// an event. Stop the other watcher while distribute is trying to
|
// an event. Stop the other watcher while distribute is trying to
|
||||||
@ -116,3 +116,52 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
m.Shutdown()
|
m.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBroadcasterDropIfChannelFull(t *testing.T) {
|
||||||
|
m := NewBroadcaster(1, DropIfChannelFull)
|
||||||
|
|
||||||
|
event1 := Event{Added, &myType{"foo", "hello world 1"}}
|
||||||
|
event2 := Event{Added, &myType{"bar", "hello world 2"}}
|
||||||
|
|
||||||
|
// Add a couple watchers
|
||||||
|
const testWatchers = 2
|
||||||
|
watches := make([]Interface, testWatchers)
|
||||||
|
for i := 0; i < testWatchers; i++ {
|
||||||
|
watches[i] = m.Watch()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a couple events before closing the broadcast channel.
|
||||||
|
t.Log("Sending event 1")
|
||||||
|
m.Action(event1.Type, event1.Object)
|
||||||
|
t.Log("Sending event 2")
|
||||||
|
m.Action(event2.Type, event2.Object)
|
||||||
|
m.Shutdown()
|
||||||
|
|
||||||
|
// Pull events from the queue.
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(testWatchers)
|
||||||
|
for i := 0; i < testWatchers; i++ {
|
||||||
|
// Verify that each watcher only gets the first event because its watch
|
||||||
|
// queue of length one was full from the first one.
|
||||||
|
go func(watcher int, w Interface) {
|
||||||
|
defer wg.Done()
|
||||||
|
e1, ok := <-w.ResultChan()
|
||||||
|
if !ok {
|
||||||
|
t.Error("Watcher %v failed to retrieve first event.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if e, a := event1, e1; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
|
||||||
|
watcher, e.Type, e.Object, a.Type, a.Object)
|
||||||
|
} else {
|
||||||
|
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
|
||||||
|
}
|
||||||
|
e2, ok := <-w.ResultChan()
|
||||||
|
if ok {
|
||||||
|
t.Error("Watcher %v received second event (%v, %#v) even though it shouldn't have.",
|
||||||
|
watcher, e2.Type, e2.Object)
|
||||||
|
}
|
||||||
|
}(i, watches[i])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user