diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5ddafc3cdee..6e56dcf5ded 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -76,7 +76,7 @@ func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodIn Port: 10251, } default: - glog.Fatalf("Can't get info for: '%v', '%v'", host, podNamespace, podID) + glog.Fatalf("Can't get info for: '%v', '%v - %v'", host, podNamespace, podID) } return c.GetPodInfo("localhost", podNamespace, podID) } diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index d781c9fd6f5..b947802948e 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -46,6 +46,7 @@ var ( preventSkew = flag.Bool("expect_version_match", false, "Fail if server's version doesn't match own version.") config = flag.String("c", "", "Path or URL to the config file, or '-' to read from STDIN") selector = flag.String("l", "", "Selector (label query) to use for listing") + fieldSelector = flag.String("fields", "", "Selector (field query) to use for listing") updatePeriod = flag.Duration("u", 60*time.Second, "Update interval period") portSpec = flag.String("p", "", "The port spec, comma-separated list of :,...") servicePort = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'") @@ -376,6 +377,9 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { if len(*selector) > 0 { r.ParseSelectorParam("labels", *selector) } + if len(*fieldSelector) > 0 { + r.ParseSelectorParam("fields", *fieldSelector) + } if setBody { if len(version) > 0 { data := readConfig(storage, c.RESTClient.Codec) diff --git a/hack/vet-go.sh b/hack/vet-go.sh index 334ce11754f..1da59cf5042 100755 --- a/hack/vet-go.sh +++ b/hack/vet-go.sh @@ -19,10 +19,10 @@ set -o errexit set -o nounset set -o pipefail -KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. +KUBE_REPO_ROOT=$(dirname "${BASH_SOURCE}")/.. # Set the environment variables required by the build. -source "${KUBE_ROOT}/hack/config-go.sh" +source "${KUBE_REPO_ROOT}/hack/config-go.sh" # Go to the top of the tree. cd "${KUBE_REPO_ROOT}" diff --git a/pkg/api/latest/latest_test.go b/pkg/api/latest/latest_test.go index 3465e4d258e..fbef7f1ee4a 100644 --- a/pkg/api/latest/latest_test.go +++ b/pkg/api/latest/latest_test.go @@ -139,7 +139,7 @@ func TestResourceVersioner(t *testing.T) { t.Fatalf("unexpected error: %v", err) } if version != "10" { - t.Errorf("unexpected version %d", version) + t.Errorf("unexpected version %v", version) } } diff --git a/pkg/api/testapi/testapi.go b/pkg/api/testapi/testapi.go index 23a5b25391b..f9ef1816176 100644 --- a/pkg/api/testapi/testapi.go +++ b/pkg/api/testapi/testapi.go @@ -18,6 +18,7 @@ limitations under the License. package testapi import ( + "fmt" "os" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" @@ -52,3 +53,13 @@ func ResourceVersioner() runtime.ResourceVersioner { } return interfaces.ResourceVersioner } + +// SelfLink returns a self link that will appear to be for the version Version(). +// 'resource' should be the resource path, e.g. "pods" for the Pod type. 'name' should be +// empty for lists. +func SelfLink(resource, name string) string { + if name == "" { + return fmt.Sprintf("/api/%s/%s", Version(), resource) + } + return fmt.Sprintf("/api/%s/%s/%s", Version(), resource, name) +} diff --git a/pkg/client/client.go b/pkg/client/client.go index e4a0c49c39a..af47ea41714 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -73,6 +73,14 @@ type EndpointsInterface interface { WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) } +// EventInterface has methods to work with Event resources +type EventInterface interface { + CreateEvent(event *api.Event) (*api.Event, error) + ListEvents(selector labels.Selector) (*api.EventList, error) + GetEvent(id string) (*api.Event, error) + WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) +} + // VersionInterface has a method to retrieve the server version. type VersionInterface interface { ServerVersion() (*version.Info, error) @@ -297,3 +305,40 @@ func (c *Client) GetMinion(id string) (result *api.Minion, err error) { err = c.Get().Path("minions").Path(id).Do().Into(result) return } + +// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error. +func (c *Client) CreateEvent(event *api.Event) (*api.Event, error) { + result := &api.Event{} + err := c.Post().Path("events").Body(event).Do().Into(result) + return result, err +} + +// ListEvents returns a list of events matching the selectors. +func (c *Client) ListEvents(label, field labels.Selector) (*api.EventList, error) { + result := &api.EventList{} + err := c.Get(). + Path("events"). + SelectorParam("labels", label). + SelectorParam("fields", field). + Do(). + Into(result) + return result, err +} + +// GetEvent returns the given event, or an error. +func (c *Client) GetEvent(id string) (*api.Event, error) { + result := &api.Event{} + err := c.Get().Path("events").Path(id).Do().Into(result) + return result, err +} + +// WatchEvents starts watching for events matching the given selectors. +func (c *Client) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("events"). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} diff --git a/pkg/client/fake.go b/pkg/client/fake.go index 41eb9cd9c33..938deee4020 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -38,6 +38,7 @@ type Fake struct { ServiceList api.ServiceList EndpointsList api.EndpointsList Minions api.MinionList + Events api.EventList Err error Watch watch.Interface } @@ -152,3 +153,27 @@ func (c *Fake) ListMinions() (*api.MinionList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-minions", Value: nil}) return &c.Minions, nil } + +// CreateEvent makes a new event. Returns the copy of the event the server returns, or an error. +func (c *Fake) CreateEvent(event *api.Event) (*api.Event, error) { + c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: event.ID}) + return &api.Event{}, nil +} + +// ListEvents returns a list of events matching the selectors. +func (c *Fake) ListEvents(label, field labels.Selector) (*api.EventList, error) { + c.Actions = append(c.Actions, FakeAction{Action: "list-events"}) + return &c.Events, nil +} + +// GetEvent returns the given event, or an error. +func (c *Fake) GetEvent(id string) (*api.Event, error) { + c.Actions = append(c.Actions, FakeAction{Action: "get-event", Value: id}) + return &api.Event{}, nil +} + +// WatchEvents starts watching for events matching the given selectors. +func (c *Fake) WatchEvents(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-events", Value: resourceVersion}) + return c.Watch, c.Err +} diff --git a/pkg/client/record/doc.go b/pkg/client/record/doc.go new file mode 100644 index 00000000000..ff6ccf3575f --- /dev/null +++ b/pkg/client/record/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package record has all client logic for recording and reporting events. +package record diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go new file mode 100644 index 00000000000..a6bd6f64562 --- /dev/null +++ b/pkg/client/record/event.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +// EventRecorder knows how to store events (client.Client implements it.) +type EventRecorder interface { + CreateEvent(event *api.Event) (*api.Event, error) +} + +// StartRecording starts sending events to recorder. Call once while initializing +// your binary. Subsequent calls will be ignored. The return value can be ignored +// or used to stop recording, if desired. +func StartRecording(recorder EventRecorder, sourceName string) watch.Interface { + return GetEvents(func(event *api.Event) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy + event.Source = sourceName + for { + _, err := recorder.CreateEvent(event) + if err == nil { + break + } + glog.Errorf("Sleeping: Unable to write event: %v", err) + time.Sleep(10 * time.Second) + } + }) +} + +// StartLogging just logs local events, using the given logging function. The +// return value can be ignored or used to stop logging, if desired. +func StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return GetEvents(func(e *api.Event) { + logf("Event(%#v): status: '%v', reason: '%v' %v", e.InvolvedObject, e.Status, e.Reason, e.Message) + }) +} + +// GetEvents lets you see *local* events. Convenience function for testing. The +// return value can be ignored or used to stop logging, if desired. +func GetEvents(f func(*api.Event)) watch.Interface { + w := events.Watch() + go func() { + defer util.HandleCrash() + for { + watchEvent, open := <-w.ResultChan() + if !open { + return + } + event, ok := watchEvent.Object.(*api.Event) + if !ok { + // This is all local, so there's no reason this should + // ever happen. + continue + } + f(event) + } + }() + return w +} + +const queueLen = 1000 + +var events = watch.NewMux(queueLen) + +// Event constructs an event from the given information and puts it in the queue for sending. +// 'object' is the object this event is about; 'fieldPath', if not "", locates a part of 'object'. +// 'status' is the new status of the object. 'reason' is the reason it now has this status. +// Both 'status' and 'reason' should be short and unique; they will be used to automate +// handling of events, so imagine people writing switch statements to handle them. You want to +// make that easy. +// 'message' is intended to be human readable. +func Event(object runtime.Object, fieldPath, status, reason, message string) { + ref, err := api.GetReference(object) + if err != nil { + glog.Errorf("Could not construct reference to: %#v", object) + return + } + ref.FieldPath = fieldPath + e := &api.Event{ + InvolvedObject: *ref, + Status: status, + Reason: reason, + Message: message, + } + + events.Action(watch.Added, e) +} + +// Eventf is just like Event, but with Sprintf for the message field. +func Eventf(object runtime.Object, fieldPath, status, reason, messageFmt string, args ...interface{}) { + Event(object, fieldPath, status, reason, fmt.Sprintf(messageFmt, args...)) +} diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go new file mode 100644 index 00000000000..5babef0b389 --- /dev/null +++ b/pkg/client/record/event_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package record_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type testEventRecorder struct { + OnEvent func(e *api.Event) (*api.Event, error) +} + +// CreateEvent records the event for testing. +func (t *testEventRecorder) CreateEvent(e *api.Event) (*api.Event, error) { + if t.OnEvent != nil { + return t.OnEvent(e) + } + return e, nil +} + +func (t *testEventRecorder) clearOnEvent() { + t.OnEvent = nil +} + +func TestEventf(t *testing.T) { + table := []struct { + obj runtime.Object + fieldPath, status, reason string + messageFmt string + elements []interface{} + expect *api.Event + expectLog string + }{ + { + obj: &api.Pod{ + TypeMeta: api.TypeMeta{ + SelfLink: "/api/v1beta1/pods/foo", + ID: "foo", + }, + }, + fieldPath: "desiredState.manifest.containers[2]", + status: "running", + reason: "started", + messageFmt: "some verbose message: %v", + elements: []interface{}{1}, + expect: &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "foo", + APIVersion: "v1beta1", + FieldPath: "desiredState.manifest.containers[2]", + }, + Status: "running", + Reason: "started", + Message: "some verbose message: 1", + Source: "eventTest", + }, + expectLog: `Event(api.ObjectReference{Kind:"Pod", Namespace:"", Name:"foo", UID:"foo", APIVersion:"v1beta1", ResourceVersion:"", FieldPath:"desiredState.manifest.containers[2]"}): status: 'running', reason: 'started' some verbose message: 1`, + }, + } + + for _, item := range table { + called := make(chan struct{}) + testEvents := testEventRecorder{ + OnEvent: func(a *api.Event) (*api.Event, error) { + if e := item.expect; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } + called <- struct{}{} + return a, nil + }, + } + recorder := record.StartRecording(&testEvents, "eventTest") + logger := record.StartLogging(t.Logf) // Prove that it is useful + logger2 := record.StartLogging(func(formatter string, args ...interface{}) { + if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { + t.Errorf("Expected '%v', got '%v'", e, a) + } + called <- struct{}{} + }) + + record.Eventf(item.obj, item.fieldPath, item.status, item.reason, item.messageFmt, item.elements...) + + <-called + <-called + recorder.Stop() + logger.Stop() + logger2.Stop() + } +} diff --git a/pkg/kubecfg/resource_printer.go b/pkg/kubecfg/resource_printer.go index c4e037248c7..f54532a3f94 100644 --- a/pkg/kubecfg/resource_printer.go +++ b/pkg/kubecfg/resource_printer.go @@ -143,6 +143,7 @@ var replicationControllerColumns = []string{"ID", "Image(s)", "Selector", "Repli var serviceColumns = []string{"ID", "Labels", "Selector", "Port"} var minionColumns = []string{"Minion identifier"} var statusColumns = []string{"Status"} +var eventColumns = []string{"Name", "Kind", "Status", "Reason", "Message"} // addDefaultHandlers adds print handlers for default Kubernetes types. func (h *HumanReadablePrinter) addDefaultHandlers() { @@ -155,6 +156,8 @@ func (h *HumanReadablePrinter) addDefaultHandlers() { h.Handler(minionColumns, printMinion) h.Handler(minionColumns, printMinionList) h.Handler(statusColumns, printStatus) + h.Handler(eventColumns, printEvent) + h.Handler(eventColumns, printEventList) } func (h *HumanReadablePrinter) unknown(data []byte, w io.Writer) error { @@ -256,6 +259,27 @@ func printStatus(status *api.Status, w io.Writer) error { return err } +func printEvent(event *api.Event, w io.Writer) error { + _, err := fmt.Fprintf( + w, "%s\t%s\t%s\t%s\t%s\n", + event.InvolvedObject.Name, + event.InvolvedObject.Kind, + event.Status, + event.Reason, + event.Message, + ) + return err +} + +func printEventList(list *api.EventList, w io.Writer) error { + for i := range list.Items { + if err := printEvent(&list.Items[i], w); err != nil { + return err + } + } + return nil +} + // Print parses the data as JSON, then prints the parsed data in a human-friendly // format according to the type of the data. func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error { diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go index ca872ef0437..ead41b7e166 100644 --- a/plugin/cmd/scheduler/scheduler.go +++ b/plugin/cmd/scheduler/scheduler.go @@ -23,6 +23,7 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" masterPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -55,6 +56,8 @@ func main() { glog.Fatalf("Invalid API configuration: %v", err) } + record.StartRecording(kubeClient, "scheduler") + go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) configFactory := &factory.ConfigFactory{Client: kubeClient} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 81d413ff385..5a57a069c16 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -213,7 +213,7 @@ func (s *storeToMinionLister) GetNodeInfo(id string) (*api.Minion, error) { if minion, ok := s.Get(id); ok { return minion.(*api.Minion), nil } - return nil, fmt.Errorf("minion '%v' is not in cache") + return nil, fmt.Errorf("minion '%v' is not in cache", id) } // storeToPodLister turns a store into a pod lister. The store must contain (only) pods. diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 2c6f81483e2..c1527600e13 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" // TODO: move everything from pkg/scheduler into this package. Remove references from registry. "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -67,6 +68,7 @@ func (s *Scheduler) scheduleOne() { pod := s.config.NextPod() dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister) if err != nil { + record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Error scheduling: %v", err) s.config.Error(pod, err) return } @@ -75,6 +77,9 @@ func (s *Scheduler) scheduleOne() { Host: dest, } if err := s.config.Binder.Bind(b); err != nil { + record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Binding rejected: %v", err) s.config.Error(pod, err) + return } + record.Eventf(pod, "", string(api.PodWaiting), "scheduled", "Successfully assigned %v to %v", pod.ID, dest) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 5d0df3722d3..421e4267789 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -22,6 +22,8 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" ) @@ -32,7 +34,7 @@ type fakeBinder struct { func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) } func podWithID(id string) *api.Pod { - return &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}} + return &api.Pod{TypeMeta: api.TypeMeta{ID: id, SelfLink: testapi.SelfLink("pods", id)}} } type mockScheduler struct { @@ -45,7 +47,7 @@ func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string } func TestScheduler(t *testing.T) { - + defer record.StartLogging(t.Logf).Stop() errS := errors.New("scheduler") errB := errors.New("binder") @@ -56,16 +58,19 @@ func TestScheduler(t *testing.T) { expectErrorPod *api.Pod expectError error expectBind *api.Binding + eventReason string }{ { - sendPod: podWithID("foo"), - algo: mockScheduler{"machine1", nil}, - expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + eventReason: "scheduled", }, { sendPod: podWithID("foo"), algo: mockScheduler{"machine1", errS}, expectError: errS, expectErrorPod: podWithID("foo"), + eventReason: "failedScheduling", }, { sendPod: podWithID("foo"), algo: mockScheduler{"machine1", nil}, @@ -73,6 +78,7 @@ func TestScheduler(t *testing.T) { injectBindError: errB, expectError: errB, expectErrorPod: podWithID("foo"), + eventReason: "failedScheduling", }, } @@ -98,6 +104,13 @@ func TestScheduler(t *testing.T) { }, } s := New(c) + called := make(chan struct{}) + events := record.GetEvents(func(e *api.Event) { + if e, a := item.eventReason, e.Reason; e != a { + t.Errorf("%v: expected %v, got %v", i, e, a) + } + close(called) + }) s.scheduleOne() if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { t.Errorf("%v: error pod: wanted %v, got %v", i, e, a) @@ -108,5 +121,7 @@ func TestScheduler(t *testing.T) { if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { t.Errorf("%v: error: wanted %v, got %v", i, e, a) } + <-called + events.Stop() } }