diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index aa7c6e191fb..99cca0773cc 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -204,6 +204,9 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st if err != nil { glog.Fatalf("Couldn't create scheduler config: %v", err) } + eventBroadcaster := record.NewBroadcaster() + schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) + eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() endpoints := service.NewEndpointController(cl) @@ -221,8 +224,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, - record.FromSource(api.EventSource{Component: "controllermanager"}), 10, 5*time.Minute) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) nodeController.Run(5*time.Second, true, false) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0f48884c14f..13a94d8c2fa 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -29,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" @@ -178,8 +177,7 @@ func (s *CMServer) Run(_ []string) error { } nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, - kubeClient, kubeletClient, record.FromSource(api.EventSource{Component: "controllermanager"}), - s.RegisterRetryCount, s.PodEvictionTimeout) + kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 19af21c1b02..5dcacf99d1e 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -323,13 +323,15 @@ func SimpleKubelet(client *client.Client, // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kcfg *KubeletConfig) { kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride) - kcfg.Recorder = record.FromSource(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) + eventBroadcaster := record.NewBroadcaster() + kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) + eventBroadcaster.StartLogging(glog.Infof) if kcfg.KubeClient != nil { - kubelet.SetupEventSending(kcfg.KubeClient, kcfg.Hostname) + glog.Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events("")) } else { - glog.Infof("No api server defined - no events will be sent.") + glog.Infof("No api server defined - no events will be sent to API server.") } - kubelet.SetupLogging() kubelet.SetupCapabilities(kcfg.AllowPrivileged, kcfg.HostNetworkSources) credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 40a225e07a1..4d0ef780430 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -34,7 +34,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" @@ -129,8 +128,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, - record.FromSource(api.EventSource{Component: "controllermanager"}), 10, 5*time.Minute) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute) nodeController.Run(10*time.Second, true, true) endpoints := service.NewEndpointController(cl) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index b41af7704e2..33db471816e 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -35,6 +35,8 @@ const maxTriesPerEvent = 12 var sleepDuration = 10 * time.Second +const maxQueuedEvents = 1000 + // EventSink knows how to store events (client.Client implements it.) // EventSink must respect the namespace that will be embedded in 'event'. // It is assumed that EventSink will return the same sorts of errors as @@ -44,50 +46,94 @@ type EventSink interface { Update(event *api.Event) (*api.Event, error) } -var emptySource = api.EventSource{} +// EventRecorder knows how to record events on behalf of an EventSource. +type EventRecorder interface { + // Event constructs an event from the given information and puts it in the queue for sending. + // 'object' is the object this event is about. Event will make a reference-- or you may also + // pass a reference to the object directly. + // 'reason' is the reason this event is generated. 'reason' should be short and unique; it will + // be used to automate handling of events, so imagine people writing switch statements to + // handle them. You want to make that easy. + // 'message' is intended to be human readable. + // + // The resulting event will be created in the same namespace as the reference object. + Event(object runtime.Object, reason, message string) -// StartRecording starts sending events to a sink. Call once while initializing -// your binary. Subsequent calls will be ignored. The return value can be ignored -// or used to stop recording, if desired. + // Eventf is just like Event, but with Sprintf for the message field. + Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) +} + +// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. +type EventBroadcaster interface { + // StartEventWatcher starts sending events recieved from this EventBroadcaster to the given + // event handler function. The return value can be ignored or used to stop recording, if + // desired. + StartEventWatcher(eventHandler func(*api.Event)) watch.Interface + + // StartRecordingToSink starts sending events recieved from this EventBroadcaster to the given + // sink. The return value can be ignored or used to stop recording, if desired. + StartRecordingToSink(sink EventSink) watch.Interface + + // StartLogging starts sending events recieved from this EventBroadcaster to the given logging + // function. The return value can be ignored or used to stop recording, if desired. + StartLogging(logf func(format string, args ...interface{})) watch.Interface + + // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster + // with the event source set to the given event source. + NewRecorder(source api.EventSource) EventRecorder +} + +// Creates a new event broadcaster. +func NewBroadcaster() EventBroadcaster { + return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)} +} + +type eventBroadcasterImpl struct { + *watch.Broadcaster +} + +// StartRecordingToSink starts sending events recieved from the specified eventBroadcaster to the given sink. +// The return value can be ignored or used to stop recording, if desired. // TODO: make me an object with parameterizable queue length and retry interval -func StartRecording(sink EventSink) watch.Interface { +func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { // The default math/rand package functions aren't thread safe, so create a // new Rand object for each StartRecording call. randGen := rand.New(rand.NewSource(time.Now().UnixNano())) - return GetEvents(func(event *api.Event) { - // Make a copy before modification, because there could be multiple listeners. - // Events are safe to copy like this. - eventCopy := *event - event = &eventCopy + return eventBroadcaster.StartEventWatcher( + func(event *api.Event) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy - previousEvent := getEvent(event) - updateExistingEvent := previousEvent.Count > 0 - if updateExistingEvent { - event.Count = previousEvent.Count + 1 - event.FirstTimestamp = previousEvent.FirstTimestamp - event.Name = previousEvent.Name - event.ResourceVersion = previousEvent.ResourceVersion - } + previousEvent := getEvent(event) + updateExistingEvent := previousEvent.Count > 0 + if updateExistingEvent { + event.Count = previousEvent.Count + 1 + event.FirstTimestamp = previousEvent.FirstTimestamp + event.Name = previousEvent.Name + event.ResourceVersion = previousEvent.ResourceVersion + } - tries := 0 - for { - if recordEvent(sink, event, updateExistingEvent) { - break + tries := 0 + for { + if recordEvent(sink, event, updateExistingEvent) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) + } else { + time.Sleep(sleepDuration) + } } - tries++ - if tries >= maxTriesPerEvent { - glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) - break - } - // Randomize the first sleep so that various clients won't all be - // synced up if the master goes down. - if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) - } else { - time.Sleep(sleepDuration) - } - } - }) + }) } // recordEvent attempts to write event to a sink. It returns true if the event @@ -131,22 +177,23 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo return false } -// StartLogging just logs local events, using the given logging function. The -// return value can be ignored or used to stop logging, if desired. -func StartLogging(logf func(format string, args ...interface{})) watch.Interface { - return GetEvents(func(e *api.Event) { - logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message) - }) +// StartLogging starts sending events recieved from this EventBroadcaster to the given logging function. +// The return value can be ignored or used to stop recording, if desired. +func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return eventBroadcaster.StartEventWatcher( + func(e *api.Event) { + logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message) + }) } -// GetEvents lets you see *local* events. Convenience function for testing. The -// return value can be ignored or used to stop logging, if desired. -func GetEvents(f func(*api.Event)) watch.Interface { - w := events.Watch() +// StartEventWatcher starts sending events recieved from this EventBroadcaster to the given event handler function. +// The return value can be ignored or used to stop recording, if desired. +func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface { + watcher := eventBroadcaster.Watch() go func() { defer util.HandleCrash() for { - watchEvent, open := <-w.ResultChan() + watchEvent, open := <-watcher.ResultChan() if !open { return } @@ -156,58 +203,37 @@ func GetEvents(f func(*api.Event)) watch.Interface { // ever happen. continue } - f(event) + eventHandler(event) } }() - return w + return watcher } -const maxQueuedEvents = 1000 - -var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull) - -// EventRecorder knows how to record events for an EventSource. -type EventRecorder interface { - // Event constructs an event from the given information and puts it in the queue for sending. - // 'object' is the object this event is about. Event will make a reference-- or you may also - // pass a reference to the object directly. - // 'reason' is the reason this event is generated. 'reason' should be short and unique; it will - // be used to automate handling of events, so imagine people writing switch statements to - // handle them. You want to make that easy. - // 'message' is intended to be human readable. - // - // The resulting event will be created in the same namespace as the reference object. - Event(object runtime.Object, reason, message string) - - // Eventf is just like Event, but with Sprintf for the message field. - Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) -} - -// FromSource returns an EventRecorder that records events with the -// given event source. -func FromSource(source api.EventSource) EventRecorder { - return &recorderImpl{source} +// NewRecorder returns an EventRecorder that records events with the given event source. +func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder { + return &recorderImpl{source, eventBroadcaster.Broadcaster} } type recorderImpl struct { source api.EventSource + *watch.Broadcaster } -func (i *recorderImpl) Event(object runtime.Object, reason, message string) { +func (recorder *recorderImpl) Event(object runtime.Object, reason, message string) { ref, err := api.GetReference(object) if err != nil { glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message) return } - e := makeEvent(ref, reason, message) - e.Source = i.source + event := makeEvent(ref, reason, message) + event.Source = recorder.source - events.Action(watch.Added, e) + recorder.Action(watch.Added, event) } -func (i *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { - i.Event(object, reason, fmt.Sprintf(messageFmt, args...)) +func (recorder *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { + recorder.Event(object, reason, fmt.Sprintf(messageFmt, args...)) } func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event { diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index 728c10301df..8c9de74b884 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -291,23 +291,23 @@ func TestEventf(t *testing.T) { return returnEvent, nil }, } - recorder := StartRecording(&testEvents) - logger := StartLogging(t.Logf) // Prove that it is useful - logger2 := StartLogging(func(formatter string, args ...interface{}) { + eventBroadcaster := NewBroadcaster() + sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful + logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } called <- struct{}{} }) - - testSource := api.EventSource{Component: "eventTest"} - FromSource(testSource).Eventf(item.obj, item.reason, item.messageFmt, item.elements...) + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) + recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) <-called <-called - recorder.Stop() - logger.Stop() - logger2.Stop() + sinkWatcher.Stop() + logWatcher1.Stop() + logWatcher2.Stop() } } @@ -387,7 +387,8 @@ func TestWriteEventError(t *testing.T) { } done := make(chan struct{}) - defer StartRecording( + eventBroadcaster := NewBroadcaster() + defer eventBroadcaster.StartRecordingToSink( &testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { if event.Message == "finished" { @@ -407,12 +408,11 @@ func TestWriteEventError(t *testing.T) { }, }, ).Stop() - - testSource := api.EventSource{Component: "eventTest"} + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) for caseName := range table { - FromSource(testSource).Event(ref, "Reason", caseName) + recorder.Event(ref, "Reason", caseName) } - FromSource(testSource).Event(ref, "Reason", "finished") + recorder.Event(ref, "Reason", "finished") <-done for caseName, item := range table { @@ -443,12 +443,13 @@ func TestLotsOfEvents(t *testing.T) { return event, nil }, } - recorder := StartRecording(&testEvents) - testSource := api.EventSource{Component: "eventTest"} - logger := StartLogging(func(formatter string, args ...interface{}) { + + eventBroadcaster := NewBroadcaster() + sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) + logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { loggerCalled <- struct{}{} }) - + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) ref := &api.ObjectReference{ Kind: "Pod", Name: "foo", @@ -457,7 +458,7 @@ func TestLotsOfEvents(t *testing.T) { APIVersion: "v1beta1", } for i := 0; i < maxQueuedEvents; i++ { - go FromSource(testSource).Event(ref, "Reason", strconv.Itoa(i)) + go recorder.Eventf(ref, "Reason", strconv.Itoa(i)) } // Make sure no events were dropped by either of the listeners. for i := 0; i < maxQueuedEvents; i++ { @@ -470,6 +471,6 @@ func TestLotsOfEvents(t *testing.T) { t.Errorf("Only attempted to record event '%d' %d times.", i, counts[i]) } } - recorder.Stop() - logger.Stop() + sinkWatcher.Stop() + logWatcher.Stop() } diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 4dc87cb1a76..457d1399460 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -91,9 +91,16 @@ func NewNodeController( staticResources *api.NodeResources, kubeClient client.Interface, kubeletClient client.KubeletClient, - recorder record.EventRecorder, registerRetryCount int, podEvictionTimeout time.Duration) *NodeController { + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) + if kubeClient != nil { + glog.Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + } else { + glog.Infof("No api server defined - no events will be sent to API server.") + } return &NodeController{ cloud: cloud, matchRE: matchRE, @@ -125,7 +132,6 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus // Register intial set of nodes with their status set. var nodes *api.NodeList var err error - record.StartRecording(nc.kubeClient.Events("")) if nc.isRunningCloudProvider() { if syncNodeList { if nodes, err = nc.GetCloudNodesWithSpec(); err != nil { diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 53f9fc67297..2d8ae05cb92 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -30,7 +30,6 @@ import ( apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -248,7 +247,7 @@ func TestRegisterNodes(t *testing.T) { for _, machine := range item.machines { nodes.Items = append(nodes.Items, *newNode(machine)) } - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, nil, 10, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -333,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) { }, } for _, item := range table { - nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, nil, 10, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute) nodes, err := nodeController.GetStaticNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -394,7 +393,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute) nodes, err := nodeController.GetCloudNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -491,7 +490,7 @@ func TestSyncCloudNodes(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -573,7 +572,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -675,7 +674,7 @@ func TestNodeConditionsCheck(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, nil, 10, time.Minute) + nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) nodeController.now = func() util.Time { return fakeNow } conditions := nodeController.DoCheck(item.node) if !reflect.DeepEqual(item.expectedConditions, conditions) { @@ -706,7 +705,7 @@ func TestPopulateNodeAddresses(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute) result, err := nodeController.PopulateAddresses(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -809,7 +808,7 @@ func TestSyncProbedNodeStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -912,7 +911,7 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, &record.FakeRecorder{}, 10, time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { @@ -1065,7 +1064,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, &record.FakeRecorder{}, 10, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1223,7 +1222,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, &record.FakeRecorder{}, 10, item.evictionTimeout) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1405,7 +1404,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, &record.FakeRecorder{}, 10, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 11ec7a3b6d5..c9288b5d715 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -76,7 +76,8 @@ func CreatePodUpdate(op kubelet.PodOperation, source string, pods ...api.Pod) ku } func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubelet.PodUpdate, *PodConfig) { - config := NewPodConfig(mode, record.FromSource(api.EventSource{Component: "kubelet"})) + eventBroadcaster := record.NewBroadcaster() + config := NewPodConfig(mode, eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet"})) channel := config.Channel(TestSource) ch := config.Updates() return channel, ch, config diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 2c23c2d46a8..1af0a87bfed 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -20,9 +20,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/golang/glog" cadvisorApi "github.com/google/cadvisor/info/v1" ) @@ -34,17 +31,6 @@ func SetupCapabilities(allowPrivileged bool, hostNetworkSources []string) { }) } -// TODO: Split this up? -func SetupLogging() { - // Log the events locally too. - record.StartLogging(glog.Infof) -} - -func SetupEventSending(client *client.Client, hostname string) { - glog.Infof("Sending events to api server.") - record.StartRecording(client.Events("")) -} - func CapacityFromMachineInfo(info *cadvisorApi.MachineInfo) api.ResourceList { c := api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity( diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index ead60f5d7ed..98f4f774fdc 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -26,6 +26,7 @@ import ( "os" "strconv" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -77,8 +78,6 @@ func (s *SchedulerServer) Run(_ []string) error { glog.Fatalf("Invalid API configuration: %v", err) } - record.StartRecording(kubeClient.Events("")) - go func() { if s.EnableProfiling { mux := http.NewServeMux() @@ -95,6 +94,10 @@ func (s *SchedulerServer) Run(_ []string) error { glog.Fatalf("Failed to create scheduler configuration: %v", err) } + eventBroadcaster := record.NewBroadcaster() + config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + sched := scheduler.New(config) sched.Run() diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 30b5dd60f5a..ff869e3126d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,7 +26,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -194,8 +193,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe glog.V(2).Infof("About to try and schedule pod %v", pod.Name) return pod }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), - Recorder: record.FromSource(api.EventSource{Component: "scheduler"}), + Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), }, nil } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index ef07d4e3139..22b0126a635 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -56,7 +56,8 @@ func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string } func TestScheduler(t *testing.T) { - defer record.StartLogging(t.Logf).Stop() + eventBroadcaster := record.NewBroadcaster() + defer eventBroadcaster.StartLogging(t.Logf).Stop() errS := errors.New("scheduler") errB := errors.New("binder") @@ -119,11 +120,11 @@ func TestScheduler(t *testing.T) { NextPod: func() *api.Pod { return item.sendPod }, - Recorder: record.FromSource(api.EventSource{Component: "scheduler"}), + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), } s := New(c) called := make(chan struct{}) - events := record.GetEvents(func(e *api.Event) { + events := eventBroadcaster.StartEventWatcher(func(e *api.Event) { if e, a := item.eventReason, e.Reason; e != a { t.Errorf("%v: expected %v, got %v", i, e, a) }