diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c6b728106f5..2c6c1fc65e1 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -157,8 +157,13 @@ func Run(s *options.CMServer) error { glog.Fatal(server.ListenAndServe()) }() + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"}) + run := func(stop <-chan struct{}) { - err := StartControllers(s, kubeClient, kubeconfig, stop) + err := StartControllers(s, kubeClient, kubeconfig, stop, recorder) glog.Fatalf("error running controllers: %v", err) panic("unreachable") } @@ -168,11 +173,6 @@ func Run(s *options.CMServer) error { panic("unreachable") } - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) - recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"}) - id, err := os.Hostname() if err != nil { return err @@ -199,7 +199,7 @@ func Run(s *options.CMServer) error { panic("unreachable") } -func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { +func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error { sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)()) go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). @@ -447,7 +447,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumes().Informer(), cloud, - ProbeAttachableVolumePlugins(s.VolumeConfiguration)) + ProbeAttachableVolumePlugins(s.VolumeConfiguration), + recorder) if attachDetachControllerErr != nil { glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 0d6dbce4ede..08c12374ae7 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -70,7 +71,8 @@ func NewAttachDetachController( pvcInformer framework.SharedInformer, pvInformer framework.SharedInformer, cloud cloudprovider.Interface, - plugins []volume.VolumePlugin) (AttachDetachController, error) { + plugins []volume.VolumePlugin, + recorder record.EventRecorder) (AttachDetachController, error) { // TODO: The default resyncPeriod for shared informers is 12 hours, this is // unacceptable for the attach/detach controller. For example, if a pod is // skipped because the node it is scheduled to didn't set its annotation in @@ -113,7 +115,8 @@ func NewAttachDetachController( adc.attacherDetacher = operationexecutor.NewOperationExecutor( kubeClient, - &adc.volumePluginMgr) + &adc.volumePluginMgr, + recorder) adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater( kubeClient, nodeInformer, adc.actualStateOfWorld) adc.reconciler = reconciler.NewReconciler( @@ -184,6 +187,9 @@ type attachDetachController struct { // desiredStateOfWorldPopulator runs an asynchronous periodic loop to // populate the current pods using podInformer. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator + + // recorder is used to record events in the API server + recorder record.EventRecorder } func (adc *attachDetachController) Run(stopCh <-chan struct{}) { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 5fad9405db7..ca54bb11616 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/framework/informers" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" ) @@ -32,6 +33,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) + fakeRecorder := &record.FakeRecorder{} // Act _, err := NewAttachDetachController( @@ -41,7 +43,8 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { pvcInformer, pvInformer, nil, /* cloud */ - nil /* plugins */) + nil, /* plugins */ + fakeRecorder) // Assert if err != nil { diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 4b2e564518c..d71ce1fcfa3 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" @@ -45,8 +46,9 @@ func Test_Run_Positive_DoNothing(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} ad := operationexecutor.NewOperationExecutor( - fakeKubeClient, volumePluginMgr) + fakeKubeClient, volumePluginMgr, fakeRecorder) nodeInformer := informers.NewNodeInformer( fakeKubeClient, resyncPeriod) nsu := statusupdater.NewNodeStatusUpdater( @@ -76,7 +78,8 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() - ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) @@ -121,7 +124,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() - ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) @@ -187,7 +191,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() - ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) @@ -253,7 +258,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() - ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) diff --git a/pkg/kubelet/events/event.go b/pkg/kubelet/events/event.go index 2db0419d608..c878062b088 100644 --- a/pkg/kubelet/events/event.go +++ b/pkg/kubelet/events/event.go @@ -40,7 +40,12 @@ const ( NodeNotSchedulable = "NodeNotSchedulable" StartingKubelet = "Starting" KubeletSetupFailed = "KubeletSetupFailed" + FailedDetachVolume = "FailedDetachVolume" FailedMountVolume = "FailedMount" + FailedUnMountVolume = "FailedUnMount" + SuccessfulDetachVolume = "SuccessfulDetachVolume" + SuccessfulMountVolume = "SuccessfulMountVolume" + SuccessfulUnMountVolume = "SuccessfulUnMountVolume" HostPortConflict = "HostPortConflict" NodeSelectorMismatching = "NodeSelectorMismatching" InsufficientFreeCPU = "InsufficientFreeCPU" diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7e53cfec2fa..b17b64ea7e8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -530,7 +530,8 @@ func NewMainKubelet( klet.volumePluginMgr, klet.containerRuntime, mounter, - klet.getPodsDir()) + klet.getPodsDir(), + recorder) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8412ad2efce..8ebc6b6e765 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -249,7 +249,8 @@ func newTestKubeletWithImageList( kubelet.volumePluginMgr, fakeRuntime, kubelet.mounter, - kubelet.getPodsDir()) + kubelet.getPodsDir(), + kubelet.recorder) if err != nil { t.Fatalf("failed to initialize volume manager: %v", err) } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index e368056cfde..aca17e51c92 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -100,7 +100,8 @@ func TestRunOnce(t *testing.T) { kb.volumePluginMgr, fakeRuntime, kb.mounter, - kb.getPodsDir()) + kb.getPodsDir(), + kb.recorder) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR) // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index aa9e61b7a14..661c50830b2 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" @@ -57,7 +58,8 @@ func Test_Run_Positive_DoNothing(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() - oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder) reconciler := NewReconciler( kubeClient, false, /* controllerAttachDetachEnabled */ @@ -93,7 +95,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() - oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder) reconciler := NewReconciler( kubeClient, false, /* controllerAttachDetachEnabled */ @@ -163,7 +166,8 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() - oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder) reconciler := NewReconciler( kubeClient, true, /* controllerAttachDetachEnabled */ @@ -234,7 +238,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() - oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder) reconciler := NewReconciler( kubeClient, false, /* controllerAttachDetachEnabled */ @@ -316,7 +321,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() - oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) + fakeRecorder := &record.FakeRecorder{} + oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder) reconciler := NewReconciler( kubeClient, true, /* controllerAttachDetachEnabled */ diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 353887af68b..d2911173920 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -148,7 +149,9 @@ func NewVolumeManager( volumePluginMgr *volume.VolumePluginMgr, kubeContainerRuntime kubecontainer.Runtime, mounter mount.Interface, - kubeletPodsDir string) (VolumeManager, error) { + kubeletPodsDir string, + recorder record.EventRecorder) (VolumeManager, error) { + vm := &volumeManager{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, @@ -156,7 +159,8 @@ func NewVolumeManager( actualStateOfWorld: cache.NewActualStateOfWorld(hostName, volumePluginMgr), operationExecutor: operationexecutor.NewOperationExecutor( kubeClient, - volumePluginMgr), + volumePluginMgr, + recorder), } vm.reconciler = reconciler.NewReconciler( diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index db9a40c03c2..b2b17d42f2c 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/pod" @@ -181,6 +182,7 @@ func newTestVolumeManager( podManager pod.Manager, kubeClient internalclientset.Interface) (VolumeManager, error) { plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} + fakeRecorder := &record.FakeRecorder{} plugMgr := &volume.VolumePluginMgr{} plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil, "" /* rootContext */)) @@ -192,7 +194,9 @@ func newTestVolumeManager( plugMgr, &containertest.FakeRuntime{}, &mount.FakeMounter{}, - "") + "", + fakeRecorder) + return vm, err } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 8d245c2279c..28ebd631300 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -28,6 +28,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" @@ -109,12 +111,15 @@ type OperationExecutor interface { // NewOperationExecutor returns a new instance of OperationExecutor. func NewOperationExecutor( kubeClient internalclientset.Interface, - volumePluginMgr *volume.VolumePluginMgr) OperationExecutor { + volumePluginMgr *volume.VolumePluginMgr, + recorder record.EventRecorder) OperationExecutor { + return &operationExecutor{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, pendingOperations: nestedpendingoperations.NewNestedPendingOperations( true /* exponentialBackOffOnError */), + recorder: recorder, } } @@ -342,6 +347,9 @@ type operationExecutor struct { // pendingOperations keeps track of pending attach and detach operations so // multiple operations are not started on the same volume pendingOperations nestedpendingoperations.NestedPendingOperations + + // recorder is used to record events in the API server + recorder record.EventRecorder } func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool { @@ -722,13 +730,15 @@ func (oe *operationExecutor) generateMountVolumeFunc( deviceMountPath) if err != nil { // On failure, return error. Caller will log and retry. - return fmt.Errorf( + err := fmt.Errorf( "MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, volumeToMount.Pod.UID, err) + oe.recorder.Eventf(volumeToMount.Pod, api.EventTypeWarning, kevents.FailedMountVolume, err.Error()) + return err } glog.Infof( @@ -757,13 +767,15 @@ func (oe *operationExecutor) generateMountVolumeFunc( mountErr := volumeMounter.SetUp(fsGroup) if mountErr != nil { // On failure, return error. Caller will log and retry. - return fmt.Errorf( + err := fmt.Errorf( "MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.PodName, volumeToMount.Pod.UID, mountErr) + oe.recorder.Eventf(volumeToMount.Pod, api.EventTypeWarning, kevents.FailedMountVolume, err.Error()) + return err } glog.Infof(