mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Optimize etcd storage by compressing recurring events in to a single event
This commit is contained in:
parent
52bf48cac2
commit
033577efa2
@ -34,6 +34,7 @@ type EventNamespacer interface {
|
|||||||
// EventInterface has methods to work with Event resources
|
// EventInterface has methods to work with Event resources
|
||||||
type EventInterface interface {
|
type EventInterface interface {
|
||||||
Create(event *api.Event) (*api.Event, error)
|
Create(event *api.Event) (*api.Event, error)
|
||||||
|
Update(event *api.Event) (*api.Event, error)
|
||||||
List(label, field labels.Selector) (*api.EventList, error)
|
List(label, field labels.Selector) (*api.EventList, error)
|
||||||
Get(name string) (*api.Event, error)
|
Get(name string) (*api.Event, error)
|
||||||
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
|
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
|
||||||
@ -73,6 +74,26 @@ func (e *events) Create(event *api.Event) (*api.Event, error) {
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update modifies an existing event. It returns the copy of the event that the server returns,
|
||||||
|
// or an error. The namespace and key to update the event within is deduced from the event. The
|
||||||
|
// namespace must either match this event client's namespace, or this event client must have been
|
||||||
|
// created with the "" namespace. Update also requires the ResourceVersion to be set in the event
|
||||||
|
// object.
|
||||||
|
func (e *events) Update(event *api.Event) (*api.Event, error) {
|
||||||
|
if len(event.ResourceVersion) == 0 {
|
||||||
|
return nil, fmt.Errorf("invalid event update object, missing resource version: %#v", event)
|
||||||
|
}
|
||||||
|
result := &api.Event{}
|
||||||
|
err := e.client.Put().
|
||||||
|
Namespace(event.Namespace).
|
||||||
|
Resource("events").
|
||||||
|
Name(event.Name).
|
||||||
|
Body(event).
|
||||||
|
Do().
|
||||||
|
Into(result)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
// List returns a list of events matching the selectors.
|
// List returns a list of events matching the selectors.
|
||||||
func (e *events) List(label, field labels.Selector) (*api.EventList, error) {
|
func (e *events) List(label, field labels.Selector) (*api.EventList, error) {
|
||||||
result := &api.EventList{}
|
result := &api.EventList{}
|
||||||
|
@ -35,6 +35,12 @@ func (c *FakeEvents) Create(event *api.Event) (*api.Event, error) {
|
|||||||
return &api.Event{}, nil
|
return &api.Event{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update replaces an existing event. Returns the copy of the event the server returns, or an error.
|
||||||
|
func (c *FakeEvents) Update(event *api.Event) (*api.Event, error) {
|
||||||
|
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-event", Value: event.Name})
|
||||||
|
return &api.Event{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// List returns a list of events matching the selectors.
|
// List returns a list of events matching the selectors.
|
||||||
func (c *FakeEvents) List(label, field labels.Selector) (*api.EventList, error) {
|
func (c *FakeEvents) List(label, field labels.Selector) (*api.EventList, error) {
|
||||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "list-events"})
|
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "list-events"})
|
||||||
|
@ -41,6 +41,7 @@ var sleepDuration = 10 * time.Second
|
|||||||
// pkg/client's REST client.
|
// pkg/client's REST client.
|
||||||
type EventRecorder interface {
|
type EventRecorder interface {
|
||||||
Create(event *api.Event) (*api.Event, error)
|
Create(event *api.Event) (*api.Event, error)
|
||||||
|
Update(event *api.Event) (*api.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartRecording starts sending events to recorder. Call once while initializing
|
// StartRecording starts sending events to recorder. Call once while initializing
|
||||||
@ -58,9 +59,18 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
|
|||||||
event = &eventCopy
|
event = &eventCopy
|
||||||
event.Source = source
|
event.Source = source
|
||||||
|
|
||||||
|
previousEvent := GetEvent(event)
|
||||||
|
updateExistingEvent := previousEvent.Count > 0
|
||||||
|
if updateExistingEvent {
|
||||||
|
event.Count = previousEvent.Count + 1
|
||||||
|
event.FirstTimestamp = previousEvent.FirstTimestamp
|
||||||
|
event.Name = previousEvent.Name
|
||||||
|
event.ResourceVersion = previousEvent.ResourceVersion
|
||||||
|
}
|
||||||
|
|
||||||
tries := 0
|
tries := 0
|
||||||
for {
|
for {
|
||||||
if recordEvent(recorder, event) {
|
if recordEvent(recorder, event, updateExistingEvent) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
tries++
|
tries++
|
||||||
@ -81,11 +91,21 @@ func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interf
|
|||||||
|
|
||||||
// recordEvent attempts to write event to recorder. It returns true if the event
|
// recordEvent attempts to write event to recorder. It returns true if the event
|
||||||
// was successfully recorded or discarded, false if it should be retried.
|
// was successfully recorded or discarded, false if it should be retried.
|
||||||
func recordEvent(recorder EventRecorder, event *api.Event) bool {
|
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
||||||
_, err := recorder.Create(event)
|
// existing event.
|
||||||
|
func recordEvent(recorder EventRecorder, event *api.Event, updateExistingEvent bool) bool {
|
||||||
|
var newEvent *api.Event
|
||||||
|
var err error
|
||||||
|
if updateExistingEvent {
|
||||||
|
newEvent, err = recorder.Update(event)
|
||||||
|
} else {
|
||||||
|
newEvent, err = recorder.Create(event)
|
||||||
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
AddOrUpdateEvent(newEvent)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we can't contact the server, then hold everything while we keep trying.
|
// If we can't contact the server, then hold everything while we keep trying.
|
||||||
// Otherwise, something about the event is malformed and we should abandon it.
|
// Otherwise, something about the event is malformed and we should abandon it.
|
||||||
giveUp := false
|
giveUp := false
|
||||||
|
@ -36,19 +36,24 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type testEventRecorder struct {
|
type testEventRecorder struct {
|
||||||
OnEvent func(e *api.Event) (*api.Event, error)
|
OnCreate func(e *api.Event) (*api.Event, error)
|
||||||
|
OnUpdate func(e *api.Event) (*api.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateEvent records the event for testing.
|
// CreateEvent records the event for testing.
|
||||||
func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) {
|
func (t *testEventRecorder) Create(e *api.Event) (*api.Event, error) {
|
||||||
if t.OnEvent != nil {
|
if t.OnCreate != nil {
|
||||||
return t.OnEvent(e)
|
return t.OnCreate(e)
|
||||||
}
|
}
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testEventRecorder) clearOnEvent() {
|
// UpdateEvent records the event for testing.
|
||||||
t.OnEvent = nil
|
func (t *testEventRecorder) Update(e *api.Event) (*api.Event, error) {
|
||||||
|
if t.OnUpdate != nil {
|
||||||
|
return t.OnUpdate(e)
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEventf(t *testing.T) {
|
func TestEventf(t *testing.T) {
|
||||||
@ -60,17 +65,28 @@ func TestEventf(t *testing.T) {
|
|||||||
UID: "bar",
|
UID: "bar",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
testPod2 := &api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
SelfLink: "/api/v1beta1/pods/foo",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
testRef, err := api.GetPartialReference(testPod, "desiredState.manifest.containers[2]")
|
testRef, err := api.GetPartialReference(testPod, "desiredState.manifest.containers[2]")
|
||||||
|
testRef2, err := api.GetPartialReference(testPod2, "desiredState.manifest.containers[3]")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
table := []struct {
|
table := []struct {
|
||||||
obj runtime.Object
|
obj runtime.Object
|
||||||
reason string
|
reason string
|
||||||
messageFmt string
|
messageFmt string
|
||||||
elements []interface{}
|
elements []interface{}
|
||||||
expect *api.Event
|
expect *api.Event
|
||||||
expectLog string
|
expectLog string
|
||||||
|
expectUpdate bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
obj: testRef,
|
obj: testRef,
|
||||||
@ -95,10 +111,36 @@ func TestEventf(t *testing.T) {
|
|||||||
Source: api.EventSource{Component: "eventTest"},
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
Count: 1,
|
Count: 1,
|
||||||
},
|
},
|
||||||
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
obj: testPod,
|
obj: testPod,
|
||||||
|
reason: "Killed",
|
||||||
|
messageFmt: "some other verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
},
|
||||||
|
Reason: "Killed",
|
||||||
|
Message: "some other verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:""}): reason: 'Killed' some other verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef,
|
||||||
reason: "Started",
|
reason: "Started",
|
||||||
messageFmt: "some verbose message: %v",
|
messageFmt: "some verbose message: %v",
|
||||||
elements: []interface{}{1},
|
elements: []interface{}{1},
|
||||||
@ -113,37 +155,140 @@ func TestEventf(t *testing.T) {
|
|||||||
Namespace: "baz",
|
Namespace: "baz",
|
||||||
UID: "bar",
|
UID: "bar",
|
||||||
APIVersion: "v1beta1",
|
APIVersion: "v1beta1",
|
||||||
|
FieldPath: "desiredState.manifest.containers[2]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
FieldPath: "desiredState.manifest.containers[3]",
|
||||||
},
|
},
|
||||||
Reason: "Started",
|
Reason: "Started",
|
||||||
Message: "some verbose message: 1",
|
Message: "some verbose message: 1",
|
||||||
Source: api.EventSource{Component: "eventTest"},
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
Count: 1,
|
Count: 1,
|
||||||
},
|
},
|
||||||
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:""}): reason: 'Started' some verbose message: 1`,
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[3]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef,
|
||||||
|
reason: "Started",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "bar",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
FieldPath: "desiredState.manifest.containers[2]",
|
||||||
|
},
|
||||||
|
Reason: "Started",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 3,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): reason: 'Started' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Stopped",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
FieldPath: "desiredState.manifest.containers[3]",
|
||||||
|
},
|
||||||
|
Reason: "Stopped",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 1,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[3]"}): reason: 'Stopped' some verbose message: 1`,
|
||||||
|
expectUpdate: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
obj: testRef2,
|
||||||
|
reason: "Stopped",
|
||||||
|
messageFmt: "some verbose message: %v",
|
||||||
|
elements: []interface{}{1},
|
||||||
|
expect: &api.Event{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Pod",
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "baz",
|
||||||
|
UID: "differentUid",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
FieldPath: "desiredState.manifest.containers[3]",
|
||||||
|
},
|
||||||
|
Reason: "Stopped",
|
||||||
|
Message: "some verbose message: 1",
|
||||||
|
Source: api.EventSource{Component: "eventTest"},
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[3]"}): reason: 'Stopped' some verbose message: 1`,
|
||||||
|
expectUpdate: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
called := make(chan struct{})
|
called := make(chan struct{})
|
||||||
testEvents := testEventRecorder{
|
testEvents := testEventRecorder{
|
||||||
OnEvent: func(event *api.Event) (*api.Event, error) {
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
a := *event
|
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||||
// Just check that the timestamp was set.
|
if item.expectUpdate {
|
||||||
if a.FirstTimestamp.IsZero() || a.LastTimestamp.IsZero() {
|
t.Errorf("Expected event update(), got event create()")
|
||||||
t.Errorf("timestamp wasn't set")
|
|
||||||
}
|
|
||||||
a.FirstTimestamp = item.expect.FirstTimestamp
|
|
||||||
a.LastTimestamp = item.expect.LastTimestamp
|
|
||||||
// Check that name has the right prefix.
|
|
||||||
if n, en := a.Name, item.expect.Name; !strings.HasPrefix(n, en) {
|
|
||||||
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
|
|
||||||
}
|
|
||||||
a.Name = item.expect.Name
|
|
||||||
if e, a := item.expect, &a; !reflect.DeepEqual(e, a) {
|
|
||||||
t.Errorf("diff: %s", util.ObjectDiff(e, a))
|
|
||||||
}
|
}
|
||||||
called <- struct{}{}
|
called <- struct{}{}
|
||||||
return event, nil
|
return returnEvent, nil
|
||||||
|
},
|
||||||
|
OnUpdate: func(event *api.Event) (*api.Event, error) {
|
||||||
|
returnEvent, _ := validateEvent(event, item.expect, t)
|
||||||
|
if !item.expectUpdate {
|
||||||
|
t.Errorf("Expected event create(), got event update()")
|
||||||
|
}
|
||||||
|
called <- struct{}{}
|
||||||
|
return returnEvent, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
|
recorder := StartRecording(&testEvents, api.EventSource{Component: "eventTest"})
|
||||||
@ -165,6 +310,39 @@ func TestEventf(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
|
||||||
|
expectCompression := expectedEvent.Count > 1
|
||||||
|
// Just check that the timestamp was set.
|
||||||
|
if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() {
|
||||||
|
t.Errorf("timestamp wasn't set: %#v", *actualEvent)
|
||||||
|
}
|
||||||
|
if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp.Time) {
|
||||||
|
if expectCompression {
|
||||||
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurance of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !expectCompression {
|
||||||
|
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
actualFirstTimestamp := actualEvent.FirstTimestamp
|
||||||
|
actualLastTimestamp := actualEvent.LastTimestamp
|
||||||
|
// Temp clear time stamps for comparison because actual values don't matter for comparison
|
||||||
|
actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp
|
||||||
|
actualEvent.LastTimestamp = expectedEvent.LastTimestamp
|
||||||
|
// Check that name has the right prefix.
|
||||||
|
if n, en := actualEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
|
||||||
|
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
|
||||||
|
}
|
||||||
|
actualEvent.Name = expectedEvent.Name
|
||||||
|
if e, a := expectedEvent, actualEvent; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a))
|
||||||
|
}
|
||||||
|
actualEvent.FirstTimestamp = actualFirstTimestamp
|
||||||
|
actualEvent.LastTimestamp = actualLastTimestamp
|
||||||
|
return actualEvent, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteEventError(t *testing.T) {
|
func TestWriteEventError(t *testing.T) {
|
||||||
ref := &api.ObjectReference{
|
ref := &api.ObjectReference{
|
||||||
Kind: "Pod",
|
Kind: "Pod",
|
||||||
@ -210,7 +388,7 @@ func TestWriteEventError(t *testing.T) {
|
|||||||
|
|
||||||
defer StartRecording(
|
defer StartRecording(
|
||||||
&testEventRecorder{
|
&testEventRecorder{
|
||||||
OnEvent: func(event *api.Event) (*api.Event, error) {
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
if event.Message == "finished" {
|
if event.Message == "finished" {
|
||||||
close(done)
|
close(done)
|
||||||
return event, nil
|
return event, nil
|
||||||
@ -250,7 +428,7 @@ func TestLotsOfEvents(t *testing.T) {
|
|||||||
// Fail each event a few times to ensure there's some load on the tested code.
|
// Fail each event a few times to ensure there's some load on the tested code.
|
||||||
var counts [1000]int
|
var counts [1000]int
|
||||||
testEvents := testEventRecorder{
|
testEvents := testEventRecorder{
|
||||||
OnEvent: func(event *api.Event) (*api.Event, error) {
|
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||||
num, err := strconv.Atoi(event.Message)
|
num, err := strconv.Atoi(event.Message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
81
pkg/client/record/events_cache.go
Normal file
81
pkg/client/record/events_cache.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package record
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type History struct {
|
||||||
|
// The number of times the event has occured since first occurance.
|
||||||
|
Count int
|
||||||
|
|
||||||
|
// The time at which the event was first recorded.
|
||||||
|
FirstTimestamp util.Time
|
||||||
|
|
||||||
|
// The unique name of the first occurance of this event
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// Resource version returned from previous interaction with server
|
||||||
|
ResourceVersion string
|
||||||
|
}
|
||||||
|
|
||||||
|
type historyMap struct {
|
||||||
|
sync.RWMutex
|
||||||
|
table map[string]History
|
||||||
|
}
|
||||||
|
|
||||||
|
var previousEvents = historyMap{table: make(map[string]History)}
|
||||||
|
|
||||||
|
// AddOrUpdateEvent creates a new entry for the given event in the previous events hash table if the event
|
||||||
|
// doesn't already exist, otherwise it updates the existing entry.
|
||||||
|
func AddOrUpdateEvent(newEvent *api.Event) History {
|
||||||
|
key := getEventKey(newEvent)
|
||||||
|
previousEvents.Lock()
|
||||||
|
defer previousEvents.Unlock()
|
||||||
|
previousEvents.table[key] =
|
||||||
|
History{
|
||||||
|
Count: newEvent.Count,
|
||||||
|
FirstTimestamp: newEvent.FirstTimestamp,
|
||||||
|
Name: newEvent.Name,
|
||||||
|
ResourceVersion: newEvent.ResourceVersion,
|
||||||
|
}
|
||||||
|
return previousEvents.table[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEvent returns the entry corresponding to the given event, if one exists, otherwise a History object
|
||||||
|
// with a count of 1 is returned.
|
||||||
|
func GetEvent(event *api.Event) History {
|
||||||
|
key := getEventKey(event)
|
||||||
|
previousEvents.RLock()
|
||||||
|
defer previousEvents.RUnlock()
|
||||||
|
return previousEvents.table[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEventKey(event *api.Event) string {
|
||||||
|
return event.Source.Component +
|
||||||
|
event.Source.Host +
|
||||||
|
event.InvolvedObject.Kind +
|
||||||
|
event.InvolvedObject.Namespace +
|
||||||
|
event.InvolvedObject.Name +
|
||||||
|
string(event.InvolvedObject.UID) +
|
||||||
|
event.InvolvedObject.APIVersion +
|
||||||
|
event.Reason +
|
||||||
|
event.Message
|
||||||
|
}
|
187
pkg/client/record/events_cache_test.go
Normal file
187
pkg/client/record/events_cache_test.go
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package record
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAddOrUpdateEventNoExisting(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
eventTime := util.Now()
|
||||||
|
event := api.Event{
|
||||||
|
Reason: "my reasons are many",
|
||||||
|
Message: "my message is love",
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "BoundPod",
|
||||||
|
Name: "awesome.name",
|
||||||
|
Namespace: "betterNamespace",
|
||||||
|
UID: "C934D34AFB20242",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
},
|
||||||
|
Source: api.EventSource{
|
||||||
|
Component: "kubelet",
|
||||||
|
Host: "kublet.node1",
|
||||||
|
},
|
||||||
|
Count: 1,
|
||||||
|
FirstTimestamp: eventTime,
|
||||||
|
LastTimestamp: eventTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
result := AddOrUpdateEvent(&event)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
compareEventWithHistoryEntry(&event, &result, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddOrUpdateEventExisting(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
event1Time := util.Unix(2324, 2342)
|
||||||
|
event2Time := util.Now()
|
||||||
|
event1 := api.Event{
|
||||||
|
Reason: "something happened",
|
||||||
|
Message: "can you believe it?",
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
ResourceVersion: "rs1",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Scheduler",
|
||||||
|
Name: "anOkName",
|
||||||
|
Namespace: "someNamespace",
|
||||||
|
UID: "C934D3234CD0242",
|
||||||
|
APIVersion: "v1beta2",
|
||||||
|
},
|
||||||
|
Source: api.EventSource{
|
||||||
|
Component: "kubelet",
|
||||||
|
Host: "kublet.node2",
|
||||||
|
},
|
||||||
|
Count: 1,
|
||||||
|
FirstTimestamp: event1Time,
|
||||||
|
LastTimestamp: event1Time,
|
||||||
|
}
|
||||||
|
event2 := api.Event{
|
||||||
|
Reason: "something happened",
|
||||||
|
Message: "can you believe it?",
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
ResourceVersion: "rs2",
|
||||||
|
},
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Scheduler",
|
||||||
|
Name: "anOkName",
|
||||||
|
Namespace: "someNamespace",
|
||||||
|
UID: "C934D3234CD0242",
|
||||||
|
APIVersion: "v1beta2",
|
||||||
|
},
|
||||||
|
Source: api.EventSource{
|
||||||
|
Component: "kubelet",
|
||||||
|
Host: "kublet.node2",
|
||||||
|
},
|
||||||
|
Count: 3,
|
||||||
|
FirstTimestamp: event1Time,
|
||||||
|
LastTimestamp: event2Time,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
AddOrUpdateEvent(&event1)
|
||||||
|
result1 := AddOrUpdateEvent(&event2)
|
||||||
|
result2 := GetEvent(&event1)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
compareEventWithHistoryEntry(&event2, &result1, t)
|
||||||
|
compareEventWithHistoryEntry(&event2, &result2, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetEventNoExisting(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
event := api.Event{
|
||||||
|
Reason: "to be or not to be",
|
||||||
|
Message: "do I exist",
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "Controller",
|
||||||
|
Name: "iAmAController",
|
||||||
|
Namespace: "IHaveANamespace",
|
||||||
|
UID: "9039D34AFBCDA42",
|
||||||
|
APIVersion: "v1beta3",
|
||||||
|
},
|
||||||
|
Source: api.EventSource{
|
||||||
|
Component: "kubelet",
|
||||||
|
Host: "kublet.node3",
|
||||||
|
},
|
||||||
|
Count: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
existingEvent := GetEvent(&event)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if existingEvent.Count != 0 {
|
||||||
|
t.Fatalf("There should be no existing instance of this event in the hash table.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetEventExisting(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
eventTime := util.Now()
|
||||||
|
event := api.Event{
|
||||||
|
Reason: "do I exist",
|
||||||
|
Message: "I do, oh my",
|
||||||
|
InvolvedObject: api.ObjectReference{
|
||||||
|
Kind: "BoundPod",
|
||||||
|
Name: "clever.name.here",
|
||||||
|
Namespace: "spaceOfName",
|
||||||
|
UID: "D933D32AFB2A238",
|
||||||
|
APIVersion: "v1beta1",
|
||||||
|
},
|
||||||
|
Source: api.EventSource{
|
||||||
|
Component: "kubelet",
|
||||||
|
Host: "kublet.node4",
|
||||||
|
},
|
||||||
|
Count: 1,
|
||||||
|
FirstTimestamp: eventTime,
|
||||||
|
LastTimestamp: eventTime,
|
||||||
|
}
|
||||||
|
AddOrUpdateEvent(&event)
|
||||||
|
|
||||||
|
// Act
|
||||||
|
existingEvent := GetEvent(&event)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
compareEventWithHistoryEntry(&event, &existingEvent, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func compareEventWithHistoryEntry(expected *api.Event, actual *History, t *testing.T) {
|
||||||
|
|
||||||
|
if actual.Count != expected.Count {
|
||||||
|
t.Fatalf("There should be one existing instance of this event in the hash table.")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !actual.FirstTimestamp.Equal(expected.FirstTimestamp.Time) {
|
||||||
|
t.Fatalf("Unexpected FirstTimestamp. Expected: <%v> Actual: <%v>", expected.FirstTimestamp, actual.FirstTimestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if actual.Name != expected.Name {
|
||||||
|
t.Fatalf("Unexpected Name. Expected: <%v> Actual: <%v>", expected.Name, actual.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if actual.ResourceVersion != expected.ResourceVersion {
|
||||||
|
t.Fatalf("Unexpected ResourceVersion. Expected: <%v> Actual: <%v>", expected.ResourceVersion, actual.ResourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -318,10 +318,12 @@ func describeEvents(el *api.EventList, w io.Writer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
sort.Sort(SortableEvents(el.Items))
|
sort.Sort(SortableEvents(el.Items))
|
||||||
fmt.Fprint(w, "Events:\nTime\tFrom\tSubobjectPath\tReason\tMessage\n")
|
fmt.Fprint(w, "Events:\nFirstSeen\tLastSeen\tCount\tFrom\tSubobjectPath\tReason\tMessage\n")
|
||||||
for _, e := range el.Items {
|
for _, e := range el.Items {
|
||||||
fmt.Fprintf(w, "%s\t%v\t%v\t%v\t%v\n",
|
fmt.Fprintf(w, "%s\t%s\t%d\t%v\t%v\t%v\t%v\n",
|
||||||
e.FirstTimestamp.Time.Format(time.RFC1123Z),
|
e.FirstTimestamp.Time.Format(time.RFC1123Z),
|
||||||
|
e.LastTimestamp.Time.Format(time.RFC1123Z),
|
||||||
|
e.Count,
|
||||||
e.Source,
|
e.Source,
|
||||||
e.InvolvedObject.FieldPath,
|
e.InvolvedObject.FieldPath,
|
||||||
e.Reason,
|
e.Reason,
|
||||||
|
@ -221,7 +221,7 @@ var serviceColumns = []string{"NAME", "LABELS", "SELECTOR", "IP", "PORT"}
|
|||||||
var endpointColumns = []string{"NAME", "ENDPOINTS"}
|
var endpointColumns = []string{"NAME", "ENDPOINTS"}
|
||||||
var minionColumns = []string{"NAME", "LABELS", "STATUS"}
|
var minionColumns = []string{"NAME", "LABELS", "STATUS"}
|
||||||
var statusColumns = []string{"STATUS"}
|
var statusColumns = []string{"STATUS"}
|
||||||
var eventColumns = []string{"TIME", "NAME", "KIND", "SUBOBJECT", "REASON", "SOURCE", "MESSAGE"}
|
var eventColumns = []string{"FIRSTSEEN", "LASTSEEN", "COUNT", "NAME", "KIND", "SUBOBJECT", "REASON", "SOURCE", "MESSAGE"}
|
||||||
var limitRangeColumns = []string{"NAME"}
|
var limitRangeColumns = []string{"NAME"}
|
||||||
var resourceQuotaColumns = []string{"NAME"}
|
var resourceQuotaColumns = []string{"NAME"}
|
||||||
var namespaceColumns = []string{"NAME", "LABELS"}
|
var namespaceColumns = []string{"NAME", "LABELS"}
|
||||||
@ -423,8 +423,10 @@ func printStatus(status *api.Status, w io.Writer) error {
|
|||||||
|
|
||||||
func printEvent(event *api.Event, w io.Writer) error {
|
func printEvent(event *api.Event, w io.Writer) error {
|
||||||
_, err := fmt.Fprintf(
|
_, err := fmt.Fprintf(
|
||||||
w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
w, "%s\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
||||||
event.FirstTimestamp.Time.Format(time.RFC1123Z),
|
event.FirstTimestamp.Time.Format(time.RFC1123Z),
|
||||||
|
event.LastTimestamp.Time.Format(time.RFC1123Z),
|
||||||
|
event.Count,
|
||||||
event.InvolvedObject.Name,
|
event.InvolvedObject.Name,
|
||||||
event.InvolvedObject.Kind,
|
event.InvolvedObject.Kind,
|
||||||
event.InvolvedObject.FieldPath,
|
event.InvolvedObject.FieldPath,
|
||||||
|
Loading…
Reference in New Issue
Block a user