diff --git a/pkg/client/client.go b/pkg/client/client.go index e4a0c49c39a..af47ea41714 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -73,6 +73,14 @@ type EndpointsInterface interface { WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } +// EventInterface has methods to work with Event resources +type EventInterface interface { + CreateEvent(event *api.Event) (*api.Event, error) + ListEvents(selector labels.Selector) (*api.EventList, error) + GetEvent(id string) (*api.Event, error) + WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) +} + // VersionInterface has a method to retrieve the server version. type VersionInterface interface { ServerVersion() (*version.Info, error) @@ -297,3 +305,40 @@ func (c *Client) GetMinion(id string) (result *api.Minion, err error) { err = c.Get().Path("minions").Path(id).Do().Into(result) return } + +// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error. +func (c *Client) CreateEvent(event *api.Event) (*api.Event, error) { + result := &api.Event{} + err := c.Post().Path("events").Body(event).Do().Into(result) + return result, err +} + +// ListEvents returns a list of events matching the selectors. +func (c *Client) ListEvents(label, field labels.Selector) (*api.EventList, error) { + result := &api.EventList{} + err := c.Get(). + Path("events"). + SelectorParam("labels", label). + SelectorParam("fields", field). + Do(). + Into(result) + return result, err +} + +// GetEvent returns the given event, or an error. +func (c *Client) GetEvent(id string) (*api.Event, error) { + result := &api.Event{} + err := c.Get().Path("events").Path(id).Do().Into(result) + return result, err +} + +// WatchEvents starts watching for events matching the given selectors. +func (c *Client) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("events"). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 41eb9cd9c33..938deee4020 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -38,6 +38,7 @@ type Fake struct { ServiceList api.ServiceList EndpointsList api.EndpointsList Minions api.MinionList + Events api.EventList Err error Watch watch.Interface } @@ -152,3 +153,27 @@ func (c *Fake) ListMinions() (*api.MinionList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-minions", Value: nil}) return &c.Minions, nil } + +// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error. +func (c *Fake) CreateEvent(event *api.Event) (*api.Event, error) { + c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: event.ID}) + return &api.Event{}, nil +} + +// ListEvents returns a list of events matching the selectors. +func (c *Fake) ListEvents(label, field labels.Selector) (*api.EventList, error) { + c.Actions = append(c.Actions, FakeAction{Action: "list-events"}) + return &c.Events, nil +} + +// GetEvent returns the given event, or an error. +func (c *Fake) GetEvent(id string) (*api.Event, error) { + c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: id}) + return &api.Event{}, nil +} + +// WatchEvents starts watching for events matching the given selectors. +func (c *Fake) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-events", Value: resourceVersion}) + return c.Watch, c.Err +} diff --git a/pkg/client/record/doc.go b/pkg/client/record/doc.go new file mode 100644 index 00000000000..ff6ccf3575f --- /dev/null +++ b/pkg/client/record/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package record has all client logic for recording and reporting events. +package record diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go new file mode 100644 index 00000000000..a6bd6f64562 --- /dev/null +++ b/pkg/client/record/event.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +// EventRecorder knows how to store events (client.Client implements it.) +type EventRecorder interface { + CreateEvent(event *api.Event) (*api.Event, error) +} + +// StartRecording starts sending events to recorder. Call once while initializing +// your binary. Subsequent calls will be ignored. The return value can be ignored +// or used to stop recording, if desired. +func StartRecording(recorder EventRecorder, sourceName string) watch.Interface { + return GetEvents(func(event *api.Event) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy + event.Source = sourceName + for { + _, err := recorder.CreateEvent(event) + if err == nil { + break + } + glog.Errorf("Sleeping: Unable to write event: %v", err) + time.Sleep(10 * time.Second) + } + }) +} + +// StartLogging just logs local events, using the given logging function. The +// return value can be ignored or used to stop logging, if desired. +func StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return GetEvents(func(e *api.Event) { + logf("Event(%#v): status: '%v', reason: '%v' %v", e.InvolvedObject, e.Status, e.Reason, e.Message) + }) +} + +// GetEvents lets you see *local* events. Convenience function for testing. The +// return value can be ignored or used to stop logging, if desired. +func GetEvents(f func(*api.Event)) watch.Interface { + w := events.Watch() + go func() { + defer util.HandleCrash() + for { + watchEvent, open := <-w.ResultChan() + if !open { + return + } + event, ok := watchEvent.Object.(*api.Event) + if !ok { + // This is all local, so there's no reason this should + // ever happen. + continue + } + f(event) + } + }() + return w +} + +const queueLen = 1000 + +var events = watch.NewMux(queueLen) + +// Event constructs an event from the given information and puts it in the queue for sending. +// 'object' is the object this event is about; 'fieldPath', if not "", locates a part of 'object'. +// 'status' is the new status of the object. 'reason' is the reason it now has this status. +// Both 'status' and 'reason' should be short and unique; they will be used to automate +// handling of events, so imagine people writing switch statements to handle them. You want to +// make that easy. +// 'message' is intended to be human readable. +func Event(object runtime.Object, fieldPath, status, reason, message string) { + ref, err := api.GetReference(object) + if err != nil { + glog.Errorf("Could not construct reference to: %#v", object) + return + } + ref.FieldPath = fieldPath + e := &api.Event{ + InvolvedObject: *ref, + Status: status, + Reason: reason, + Message: message, + } + + events.Action(watch.Added, e) +} + +// Eventf is just like Event, but with Sprintf for the message field. +func Eventf(object runtime.Object, fieldPath, status, reason, messageFmt string, args ...interface{}) { + Event(object, fieldPath, status, reason, fmt.Sprintf(messageFmt, args...)) +} diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go new file mode 100644 index 00000000000..5babef0b389 --- /dev/null +++ b/pkg/client/record/event_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type testEventRecorder struct { + OnEvent func(e *api.Event) (*api.Event, error) +} + +// CreateEvent records the event for testing. +func (t *testEventRecorder) CreateEvent(e *api.Event) (*api.Event, error) { + if t.OnEvent != nil { + return t.OnEvent(e) + } + return e, nil +} + +func (t *testEventRecorder) clearOnEvent() { + t.OnEvent = nil +} + +func TestEventf(t *testing.T) { + table := []struct { + obj runtime.Object + fieldPath, status, reason string + messageFmt string + elements []interface{} + expect *api.Event + expectLog string + }{ + { + obj: &api.Pod{ + TypeMeta: api.TypeMeta{ + SelfLink: "/api/v1beta1/pods/foo", + ID: "foo", + }, + }, + fieldPath: "desiredState.manifest.containers[2]", + status: "running", + reason: "started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "foo", + APIVersion: "v1beta1", + FieldPath: "desiredState.manifest.containers[2]", + }, + Status: "running", + Reason: "started", + Message: "some verbose message: 1", + Source: "eventTest", + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"foo", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): status: 'running', reason: 'started' some verbose message: 1`, + }, + } + + for _, item := range table { + called := make(chan struct{}) + testEvents := testEventRecorder{ + OnEvent: func(a *api.Event) (*api.Event, error) { + if e := item.expect; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } + called <- struct{}{} + return a, nil + }, + } + recorder := record.StartRecording(&testEvents, "eventTest") + logger := record.StartLogging(t.Logf) // Prove that it is useful + logger2 := record.StartLogging(func(formatter string, args ...interface{}) { + if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { + t.Errorf("Expected '%v', got '%v'", e, a) + } + called <- struct{}{} + }) + + record.Eventf(item.obj, item.fieldPath, item.status, item.reason, item.messageFmt, item.elements...) + + <-called + <-called + recorder.Stop() + logger.Stop() + logger2.Stop() + } +}