Merge pull request #1789 from lavalamp/eventing4

Add event creation library and implement in scheduler.
This commit is contained in:
Eric Tune 2014-10-15 15:55:26 -07:00
commit 6f577aa321
15 changed files with 391 additions and 10 deletions

View File

@ -76,7 +76,7 @@ func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodIn
Port: 10251,
}
default:
glog.Fatalf("Can't get info for: '%v', '%v'", host, podNamespace, podID)
glog.Fatalf("Can't get info for: '%v', '%v - %v'", host, podNamespace, podID)
}
return c.GetPodInfo("localhost", podNamespace, podID)
}

View File

@ -46,6 +46,7 @@ var (
preventSkew = flag.Bool("expect_version_match", false, "Fail if server's version doesn't match own version.")
config = flag.String("c", "", "Path or URL to the config file, or '-' to read from STDIN")
selector = flag.String("l", "", "Selector (label query) to use for listing")
fieldSelector = flag.String("fields", "", "Selector (field query) to use for listing")
updatePeriod = flag.Duration("u", 60*time.Second, "Update interval period")
portSpec = flag.String("p", "", "The port spec, comma-separated list of <external>:<internal>,...")
servicePort = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'")
@ -376,6 +377,9 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
if len(*selector) > 0 {
r.ParseSelectorParam("labels", *selector)
}
if len(*fieldSelector) > 0 {
r.ParseSelectorParam("fields", *fieldSelector)
}
if setBody {
if len(version) > 0 {
data := readConfig(storage, c.RESTClient.Codec)

View File

@ -19,10 +19,10 @@ set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
KUBE_REPO_ROOT=$(dirname "${BASH_SOURCE}")/..
# Set the environment variables required by the build.
source "${KUBE_ROOT}/hack/config-go.sh"
source "${KUBE_REPO_ROOT}/hack/config-go.sh"
# Go to the top of the tree.
cd "${KUBE_REPO_ROOT}"

View File

@ -139,7 +139,7 @@ func TestResourceVersioner(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if version != "10" {
t.Errorf("unexpected version %d", version)
t.Errorf("unexpected version %v", version)
}
}

View File

@ -18,6 +18,7 @@ limitations under the License.
package testapi
import (
"fmt"
"os"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
@ -52,3 +53,13 @@ func ResourceVersioner() runtime.ResourceVersioner {
}
return interfaces.ResourceVersioner
}
// SelfLink returns a self link that will appear to be for the version Version().
// 'resource' should be the resource path, e.g. "pods" for the Pod type. 'name' should be
// empty for lists.
func SelfLink(resource, name string) string {
if name == "" {
return fmt.Sprintf("/api/%s/%s", Version(), resource)
}
return fmt.Sprintf("/api/%s/%s/%s", Version(), resource, name)
}

View File

@ -73,6 +73,14 @@ type EndpointsInterface interface {
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// EventInterface has methods to work with Event resources
type EventInterface interface {
CreateEvent(event *api.Event) (*api.Event, error)
ListEvents(selector labels.Selector) (*api.EventList, error)
GetEvent(id string) (*api.Event, error)
WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
}
// VersionInterface has a method to retrieve the server version.
type VersionInterface interface {
ServerVersion() (*version.Info, error)
@ -297,3 +305,40 @@ func (c *Client) GetMinion(id string) (result *api.Minion, err error) {
err = c.Get().Path("minions").Path(id).Do().Into(result)
return
}
// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error.
func (c *Client) CreateEvent(event *api.Event) (*api.Event, error) {
result := &api.Event{}
err := c.Post().Path("events").Body(event).Do().Into(result)
return result, err
}
// ListEvents returns a list of events matching the selectors.
func (c *Client) ListEvents(label, field labels.Selector) (*api.EventList, error) {
result := &api.EventList{}
err := c.Get().
Path("events").
SelectorParam("labels", label).
SelectorParam("fields", field).
Do().
Into(result)
return result, err
}
// GetEvent returns the given event, or an error.
func (c *Client) GetEvent(id string) (*api.Event, error) {
result := &api.Event{}
err := c.Get().Path("events").Path(id).Do().Into(result)
return result, err
}
// WatchEvents starts watching for events matching the given selectors.
func (c *Client) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("events").
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}

View File

@ -38,6 +38,7 @@ type Fake struct {
ServiceList api.ServiceList
EndpointsList api.EndpointsList
Minions api.MinionList
Events api.EventList
Err error
Watch watch.Interface
}
@ -152,3 +153,27 @@ func (c *Fake) ListMinions() (*api.MinionList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-minions", Value: nil})
return &c.Minions, nil
}
// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error.
func (c *Fake) CreateEvent(event *api.Event) (*api.Event, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: event.ID})
return &api.Event{}, nil
}
// ListEvents returns a list of events matching the selectors.
func (c *Fake) ListEvents(label, field labels.Selector) (*api.EventList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-events"})
return &c.Events, nil
}
// GetEvent returns the given event, or an error.
func (c *Fake) GetEvent(id string) (*api.Event, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: id})
return &api.Event{}, nil
}
// WatchEvents starts watching for events matching the given selectors.
func (c *Fake) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-events", Value: resourceVersion})
return c.Watch, c.Err
}

18
pkg/client/record/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package record has all client logic for recording and reporting events.
package record

119
pkg/client/record/event.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// EventRecorder knows how to store events (client.Client implements it.)
type EventRecorder interface {
CreateEvent(event *api.Event) (*api.Event, error)
}
// StartRecording starts sending events to recorder. Call once while initializing
// your binary. Subsequent calls will be ignored. The return value can be ignored
// or used to stop recording, if desired.
func StartRecording(recorder EventRecorder, sourceName string) watch.Interface {
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
event.Source = sourceName
for {
_, err := recorder.CreateEvent(event)
if err == nil {
break
}
glog.Errorf("Sleeping: Unable to write event: %v", err)
time.Sleep(10 * time.Second)
}
})
}
// StartLogging just logs local events, using the given logging function. The
// return value can be ignored or used to stop logging, if desired.
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return GetEvents(func(e *api.Event) {
logf("Event(%#v): status: '%v', reason: '%v' %v", e.InvolvedObject, e.Status, e.Reason, e.Message)
})
}
// GetEvents lets you see *local* events. Convenience function for testing. The
// return value can be ignored or used to stop logging, if desired.
func GetEvents(f func(*api.Event)) watch.Interface {
w := events.Watch()
go func() {
defer util.HandleCrash()
for {
watchEvent, open := <-w.ResultChan()
if !open {
return
}
event, ok := watchEvent.Object.(*api.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
f(event)
}
}()
return w
}
const queueLen = 1000
var events = watch.NewMux(queueLen)
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about; 'fieldPath', if not "", locates a part of 'object'.
// 'status' is the new status of the object. 'reason' is the reason it now has this status.
// Both 'status' and 'reason' should be short and unique; they will be used to automate
// handling of events, so imagine people writing switch statements to handle them. You want to
// make that easy.
// 'message' is intended to be human readable.
func Event(object runtime.Object, fieldPath, status, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: %#v", object)
return
}
ref.FieldPath = fieldPath
e := &api.Event{
InvolvedObject: *ref,
Status: status,
Reason: reason,
Message: message,
}
events.Action(watch.Added, e)
}
// Eventf is just like Event, but with Sprintf for the message field.
func Eventf(object runtime.Object, fieldPath, status, reason, messageFmt string, args ...interface{}) {
Event(object, fieldPath, status, reason, fmt.Sprintf(messageFmt, args...))
}

View File

@ -0,0 +1,112 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package record_test
import (
"fmt"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type testEventRecorder struct {
OnEvent func(e *api.Event) (*api.Event, error)
}
// CreateEvent records the event for testing.
func (t *testEventRecorder) CreateEvent(e *api.Event) (*api.Event, error) {
if t.OnEvent != nil {
return t.OnEvent(e)
}
return e, nil
}
func (t *testEventRecorder) clearOnEvent() {
t.OnEvent = nil
}
func TestEventf(t *testing.T) {
table := []struct {
obj runtime.Object
fieldPath, status, reason string
messageFmt string
elements []interface{}
expect *api.Event
expectLog string
}{
{
obj: &api.Pod{
TypeMeta: api.TypeMeta{
SelfLink: "/api/v1beta1/pods/foo",
ID: "foo",
},
},
fieldPath: "desiredState.manifest.containers[2]",
status: "running",
reason: "started",
messageFmt: "some verbose message: %v",
elements: []interface{}{1},
expect: &api.Event{
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "foo",
UID: "foo",
APIVersion: "v1beta1",
FieldPath: "desiredState.manifest.containers[2]",
},
Status: "running",
Reason: "started",
Message: "some verbose message: 1",
Source: "eventTest",
},
expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"foo", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): status: 'running', reason: 'started' some verbose message: 1`,
},
}
for _, item := range table {
called := make(chan struct{})
testEvents := testEventRecorder{
OnEvent: func(a *api.Event) (*api.Event, error) {
if e := item.expect; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
called <- struct{}{}
return a, nil
},
}
recorder := record.StartRecording(&testEvents, "eventTest")
logger := record.StartLogging(t.Logf) // Prove that it is useful
logger2 := record.StartLogging(func(formatter string, args ...interface{}) {
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
t.Errorf("Expected '%v', got '%v'", e, a)
}
called <- struct{}{}
})
record.Eventf(item.obj, item.fieldPath, item.status, item.reason, item.messageFmt, item.elements...)
<-called
<-called
recorder.Stop()
logger.Stop()
logger2.Stop()
}
}

View File

@ -143,6 +143,7 @@ var replicationControllerColumns = []string{"ID", "Image(s)", "Selector", "Repli
var serviceColumns = []string{"ID", "Labels", "Selector", "Port"}
var minionColumns = []string{"Minion identifier"}
var statusColumns = []string{"Status"}
var eventColumns = []string{"Name", "Kind", "Status", "Reason", "Message"}
// addDefaultHandlers adds print handlers for default Kubernetes types.
func (h *HumanReadablePrinter) addDefaultHandlers() {
@ -155,6 +156,8 @@ func (h *HumanReadablePrinter) addDefaultHandlers() {
h.Handler(minionColumns, printMinion)
h.Handler(minionColumns, printMinionList)
h.Handler(statusColumns, printStatus)
h.Handler(eventColumns, printEvent)
h.Handler(eventColumns, printEventList)
}
func (h *HumanReadablePrinter) unknown(data []byte, w io.Writer) error {
@ -256,6 +259,27 @@ func printStatus(status *api.Status, w io.Writer) error {
return err
}
func printEvent(event *api.Event, w io.Writer) error {
_, err := fmt.Fprintf(
w, "%s\t%s\t%s\t%s\t%s\n",
event.InvolvedObject.Name,
event.InvolvedObject.Kind,
event.Status,
event.Reason,
event.Message,
)
return err
}
func printEventList(list *api.EventList, w io.Writer) error {
for i := range list.Items {
if err := printEvent(&list.Items[i], w); err != nil {
return err
}
}
return nil
}
// Print parses the data as JSON, then prints the parsed data in a human-friendly
// format according to the type of the data.
func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error {

View File

@ -23,6 +23,7 @@ import (
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
masterPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -55,6 +56,8 @@ func main() {
glog.Fatalf("Invalid API configuration: %v", err)
}
record.StartRecording(kubeClient, "scheduler")
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := &factory.ConfigFactory{Client: kubeClient}

View File

@ -213,7 +213,7 @@ func (s *storeToMinionLister) GetNodeInfo(id string) (*api.Minion, error) {
if minion, ok := s.Get(id); ok {
return minion.(*api.Minion), nil
}
return nil, fmt.Errorf("minion '%v' is not in cache")
return nil, fmt.Errorf("minion '%v' is not in cache", id)
}
// storeToPodLister turns a store into a pod lister. The store must contain (only) pods.

View File

@ -18,6 +18,7 @@ package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
// TODO: move everything from pkg/scheduler into this package. Remove references from registry.
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -67,6 +68,7 @@ func (s *Scheduler) scheduleOne() {
pod := s.config.NextPod()
dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister)
if err != nil {
record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Error scheduling: %v", err)
s.config.Error(pod, err)
return
}
@ -75,6 +77,9 @@ func (s *Scheduler) scheduleOne() {
Host: dest,
}
if err := s.config.Binder.Bind(b); err != nil {
record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
record.Eventf(pod, "", string(api.PodWaiting), "scheduled", "Successfully assigned %v to %v", pod.ID, dest)
}

View File

@ -22,6 +22,8 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
)
@ -32,7 +34,7 @@ type fakeBinder struct {
func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) }
func podWithID(id string) *api.Pod {
return &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}
return &api.Pod{TypeMeta: api.TypeMeta{ID: id, SelfLink: testapi.SelfLink("pods", id)}}
}
type mockScheduler struct {
@ -45,7 +47,7 @@ func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string
}
func TestScheduler(t *testing.T) {
defer record.StartLogging(t.Logf).Stop()
errS := errors.New("scheduler")
errB := errors.New("binder")
@ -56,16 +58,19 @@ func TestScheduler(t *testing.T) {
expectErrorPod *api.Pod
expectError error
expectBind *api.Binding
eventReason string
}{
{
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{PodID: "foo", Host: "machine1"},
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{PodID: "foo", Host: "machine1"},
eventReason: "scheduled",
}, {
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", errS},
expectError: errS,
expectErrorPod: podWithID("foo"),
eventReason: "failedScheduling",
}, {
sendPod: podWithID("foo"),
algo: mockScheduler{"machine1", nil},
@ -73,6 +78,7 @@ func TestScheduler(t *testing.T) {
injectBindError: errB,
expectError: errB,
expectErrorPod: podWithID("foo"),
eventReason: "failedScheduling",
},
}
@ -98,6 +104,13 @@ func TestScheduler(t *testing.T) {
},
}
s := New(c)
called := make(chan struct{})
events := record.GetEvents(func(e *api.Event) {
if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("%v: expected %v, got %v", i, e, a)
}
close(called)
})
s.scheduleOne()
if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
t.Errorf("%v: error pod: wanted %v, got %v", i, e, a)
@ -108,5 +121,7 @@ func TestScheduler(t *testing.T) {
if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
t.Errorf("%v: error: wanted %v, got %v", i, e, a)
}
<-called
events.Stop()
}
}