Merge pull request #27778 from screeley44/k8-vol-executor

Automatic merge from submit-queue

Add Events for operation_executor to show status of mounts, failed/successful to show in describe events

Fixes #27590 
@saad-ali @pmorie @erinboyd

After talking with @pmorie last week about the above issue, I decided to poke around and see if I could remedy.  The refactoring broke my previous UXP merged PR's that correctly showed failed mount errors in the describe events.  However, Not sure I implemented correctly, but it tested out and seems to be working, let me know what I missed or if this is not the correct approach.

```
Events:
  FirstSeen	LastSeen	Count	From			SubobjectPath	Type		Reason		Message
  ---------	--------	-----	----			-------------	--------	------		-------
  2m		2m		1	{default-scheduler }			Normal		Scheduled	Successfully assigned nfs-bb-pod1 to 127.0.0.1
  44s		44s		1	{kubelet 127.0.0.1}			Warning		FailedMount	Unable to mount volumes for pod "nfs-bb-pod1_default(a94f64f1-37c9-11e6-9aa5-52540073d346)": timeout expired waiting for volumes to attach/mount for pod "nfs-bb-pod1"/"default". list of unattached/unmounted volumes=[nfsvol]
  44s		44s		1	{kubelet 127.0.0.1}			Warning		FailedSync	Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod "nfs-bb-pod1"/"default". list of unattached/unmounted volumes=[nfsvol]
  38s		38s		1	{kubelet }				Warning		FailedMount	Unable to mount volumes for pod "a94f64f1-37c9-11e6-9aa5-52540073d346": Mount failed: exit status 32
Mounting arguments: nfs1.rhs:/opt/data99 /var/lib/kubelet/pods/a94f64f1-37c9-11e6-9aa5-52540073d346/volumes/kubernetes.io~nfs/nfsvol nfs []
Output: mount.nfs: Connection timed out

Resolution hint: Check and make sure the NFS Server exists (ensure that correct IPAddress/Hostname was given) and is available/reachable.
Also make sure firewall ports are open on both client and NFS Server (2049 v4 and 2049, 20048 and 111 for v3).
Use commands telnet <nfs server> <port> and showmount <nfs server> to help test connectivity.
```
This commit is contained in:
Kubernetes Submit Queue 2016-08-19 08:27:48 -07:00 committed by GitHub
commit 6ce405c6ee
12 changed files with 80 additions and 30 deletions

View File

@ -157,8 +157,13 @@ func Run(s *options.CMServer) error {
glog.Fatal(server.ListenAndServe()) glog.Fatal(server.ListenAndServe())
}() }()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
run := func(stop <-chan struct{}) { run := func(stop <-chan struct{}) {
err := StartControllers(s, kubeClient, kubeconfig, stop) err := StartControllers(s, kubeClient, kubeconfig, stop, recorder)
glog.Fatalf("error running controllers: %v", err) glog.Fatalf("error running controllers: %v", err)
panic("unreachable") panic("unreachable")
} }
@ -168,11 +173,6 @@ func Run(s *options.CMServer) error {
panic("unreachable") panic("unreachable")
} }
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
id, err := os.Hostname() id, err := os.Hostname()
if err != nil { if err != nil {
return err return err
@ -199,7 +199,7 @@ func Run(s *options.CMServer) error {
panic("unreachable") panic("unreachable")
} }
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error {
sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)()) sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)())
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
@ -447,7 +447,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
sharedInformers.PersistentVolumeClaims().Informer(), sharedInformers.PersistentVolumeClaims().Informer(),
sharedInformers.PersistentVolumes().Informer(), sharedInformers.PersistentVolumes().Informer(),
cloud, cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration)) ProbeAttachableVolumePlugins(s.VolumeConfiguration),
recorder)
if attachDetachControllerErr != nil { if attachDetachControllerErr != nil {
glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr) glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
@ -70,7 +71,8 @@ func NewAttachDetachController(
pvcInformer framework.SharedInformer, pvcInformer framework.SharedInformer,
pvInformer framework.SharedInformer, pvInformer framework.SharedInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
plugins []volume.VolumePlugin) (AttachDetachController, error) { plugins []volume.VolumePlugin,
recorder record.EventRecorder) (AttachDetachController, error) {
// TODO: The default resyncPeriod for shared informers is 12 hours, this is // TODO: The default resyncPeriod for shared informers is 12 hours, this is
// unacceptable for the attach/detach controller. For example, if a pod is // unacceptable for the attach/detach controller. For example, if a pod is
// skipped because the node it is scheduled to didn't set its annotation in // skipped because the node it is scheduled to didn't set its annotation in
@ -113,7 +115,8 @@ func NewAttachDetachController(
adc.attacherDetacher = adc.attacherDetacher =
operationexecutor.NewOperationExecutor( operationexecutor.NewOperationExecutor(
kubeClient, kubeClient,
&adc.volumePluginMgr) &adc.volumePluginMgr,
recorder)
adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater( adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
kubeClient, nodeInformer, adc.actualStateOfWorld) kubeClient, nodeInformer, adc.actualStateOfWorld)
adc.reconciler = reconciler.NewReconciler( adc.reconciler = reconciler.NewReconciler(
@ -184,6 +187,9 @@ type attachDetachController struct {
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the current pods using podInformer. // populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// recorder is used to record events in the API server
recorder record.EventRecorder
} }
func (adc *attachDetachController) Run(stopCh <-chan struct{}) { func (adc *attachDetachController) Run(stopCh <-chan struct{}) {

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
) )
@ -32,6 +33,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod)
pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod)
pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod)
fakeRecorder := &record.FakeRecorder{}
// Act // Act
_, err := NewAttachDetachController( _, err := NewAttachDetachController(
@ -41,7 +43,8 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
pvcInformer, pvcInformer,
pvInformer, pvInformer,
nil, /* cloud */ nil, /* cloud */
nil /* plugins */) nil, /* plugins */
fakeRecorder)
// Assert // Assert
if err != nil { if err != nil {

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
@ -45,8 +46,9 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor( ad := operationexecutor.NewOperationExecutor(
fakeKubeClient, volumePluginMgr) fakeKubeClient, volumePluginMgr, fakeRecorder)
nodeInformer := informers.NewNodeInformer( nodeInformer := informers.NewNodeInformer(
fakeKubeClient, resyncPeriod) fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater( nsu := statusupdater.NewNodeStatusUpdater(
@ -76,7 +78,8 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler( reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
@ -121,7 +124,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler( reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
@ -187,7 +191,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler( reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
@ -253,7 +258,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler( reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)

View File

@ -40,7 +40,12 @@ const (
NodeNotSchedulable = "NodeNotSchedulable" NodeNotSchedulable = "NodeNotSchedulable"
StartingKubelet = "Starting" StartingKubelet = "Starting"
KubeletSetupFailed = "KubeletSetupFailed" KubeletSetupFailed = "KubeletSetupFailed"
FailedDetachVolume = "FailedDetachVolume"
FailedMountVolume = "FailedMount" FailedMountVolume = "FailedMount"
FailedUnMountVolume = "FailedUnMount"
SuccessfulDetachVolume = "SuccessfulDetachVolume"
SuccessfulMountVolume = "SuccessfulMountVolume"
SuccessfulUnMountVolume = "SuccessfulUnMountVolume"
HostPortConflict = "HostPortConflict" HostPortConflict = "HostPortConflict"
NodeSelectorMismatching = "NodeSelectorMismatching" NodeSelectorMismatching = "NodeSelectorMismatching"
InsufficientFreeCPU = "InsufficientFreeCPU" InsufficientFreeCPU = "InsufficientFreeCPU"

View File

@ -530,7 +530,8 @@ func NewMainKubelet(
klet.volumePluginMgr, klet.volumePluginMgr,
klet.containerRuntime, klet.containerRuntime,
mounter, mounter,
klet.getPodsDir()) klet.getPodsDir(),
recorder)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil { if err != nil {

View File

@ -249,7 +249,8 @@ func newTestKubeletWithImageList(
kubelet.volumePluginMgr, kubelet.volumePluginMgr,
fakeRuntime, fakeRuntime,
kubelet.mounter, kubelet.mounter,
kubelet.getPodsDir()) kubelet.getPodsDir(),
kubelet.recorder)
if err != nil { if err != nil {
t.Fatalf("failed to initialize volume manager: %v", err) t.Fatalf("failed to initialize volume manager: %v", err)
} }

View File

@ -100,7 +100,8 @@ func TestRunOnce(t *testing.T) {
kb.volumePluginMgr, kb.volumePluginMgr,
fakeRuntime, fakeRuntime,
kb.mounter, kb.mounter,
kb.getPodsDir()) kb.getPodsDir(),
kb.recorder)
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR)
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency

View File

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
@ -57,7 +58,8 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient() kubeClient := createTestClient()
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder)
reconciler := NewReconciler( reconciler := NewReconciler(
kubeClient, kubeClient,
false, /* controllerAttachDetachEnabled */ false, /* controllerAttachDetachEnabled */
@ -93,7 +95,8 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient() kubeClient := createTestClient()
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder)
reconciler := NewReconciler( reconciler := NewReconciler(
kubeClient, kubeClient,
false, /* controllerAttachDetachEnabled */ false, /* controllerAttachDetachEnabled */
@ -163,7 +166,8 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient() kubeClient := createTestClient()
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder)
reconciler := NewReconciler( reconciler := NewReconciler(
kubeClient, kubeClient,
true, /* controllerAttachDetachEnabled */ true, /* controllerAttachDetachEnabled */
@ -234,7 +238,8 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient() kubeClient := createTestClient()
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder)
reconciler := NewReconciler( reconciler := NewReconciler(
kubeClient, kubeClient,
false, /* controllerAttachDetachEnabled */ false, /* controllerAttachDetachEnabled */
@ -316,7 +321,8 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient() kubeClient := createTestClient()
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr) fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder)
reconciler := NewReconciler( reconciler := NewReconciler(
kubeClient, kubeClient,
true, /* controllerAttachDetachEnabled */ true, /* controllerAttachDetachEnabled */

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -148,7 +149,9 @@ func NewVolumeManager(
volumePluginMgr *volume.VolumePluginMgr, volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime kubecontainer.Runtime, kubeContainerRuntime kubecontainer.Runtime,
mounter mount.Interface, mounter mount.Interface,
kubeletPodsDir string) (VolumeManager, error) { kubeletPodsDir string,
recorder record.EventRecorder) (VolumeManager, error) {
vm := &volumeManager{ vm := &volumeManager{
kubeClient: kubeClient, kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr, volumePluginMgr: volumePluginMgr,
@ -156,7 +159,8 @@ func NewVolumeManager(
actualStateOfWorld: cache.NewActualStateOfWorld(hostName, volumePluginMgr), actualStateOfWorld: cache.NewActualStateOfWorld(hostName, volumePluginMgr),
operationExecutor: operationexecutor.NewOperationExecutor( operationExecutor: operationexecutor.NewOperationExecutor(
kubeClient, kubeClient,
volumePluginMgr), volumePluginMgr,
recorder),
} }
vm.reconciler = reconciler.NewReconciler( vm.reconciler = reconciler.NewReconciler(

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/pod"
@ -181,6 +182,7 @@ func newTestVolumeManager(
podManager pod.Manager, podManager pod.Manager,
kubeClient internalclientset.Interface) (VolumeManager, error) { kubeClient internalclientset.Interface) (VolumeManager, error) {
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
fakeRecorder := &record.FakeRecorder{}
plugMgr := &volume.VolumePluginMgr{} plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil, "" /* rootContext */)) plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil, "" /* rootContext */))
@ -192,7 +194,9 @@ func newTestVolumeManager(
plugMgr, plugMgr,
&containertest.FakeRuntime{}, &containertest.FakeRuntime{},
&mount.FakeMounter{}, &mount.FakeMounter{},
"") "",
fakeRecorder)
return vm, err return vm, err
} }

View File

@ -28,6 +28,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@ -109,12 +111,15 @@ type OperationExecutor interface {
// NewOperationExecutor returns a new instance of OperationExecutor. // NewOperationExecutor returns a new instance of OperationExecutor.
func NewOperationExecutor( func NewOperationExecutor(
kubeClient internalclientset.Interface, kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr) OperationExecutor { volumePluginMgr *volume.VolumePluginMgr,
recorder record.EventRecorder) OperationExecutor {
return &operationExecutor{ return &operationExecutor{
kubeClient: kubeClient, kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr, volumePluginMgr: volumePluginMgr,
pendingOperations: nestedpendingoperations.NewNestedPendingOperations( pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
true /* exponentialBackOffOnError */), true /* exponentialBackOffOnError */),
recorder: recorder,
} }
} }
@ -342,6 +347,9 @@ type operationExecutor struct {
// pendingOperations keeps track of pending attach and detach operations so // pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume // multiple operations are not started on the same volume
pendingOperations nestedpendingoperations.NestedPendingOperations pendingOperations nestedpendingoperations.NestedPendingOperations
// recorder is used to record events in the API server
recorder record.EventRecorder
} }
func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool { func (oe *operationExecutor) IsOperationPending(volumeName api.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
@ -722,13 +730,15 @@ func (oe *operationExecutor) generateMountVolumeFunc(
deviceMountPath) deviceMountPath)
if err != nil { if err != nil {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
return fmt.Errorf( err := fmt.Errorf(
"MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", "MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName, volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(), volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName, volumeToMount.PodName,
volumeToMount.Pod.UID, volumeToMount.Pod.UID,
err) err)
oe.recorder.Eventf(volumeToMount.Pod, api.EventTypeWarning, kevents.FailedMountVolume, err.Error())
return err
} }
glog.Infof( glog.Infof(
@ -757,13 +767,15 @@ func (oe *operationExecutor) generateMountVolumeFunc(
mountErr := volumeMounter.SetUp(fsGroup) mountErr := volumeMounter.SetUp(fsGroup)
if mountErr != nil { if mountErr != nil {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
return fmt.Errorf( err := fmt.Errorf(
"MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", "MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName, volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(), volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName, volumeToMount.PodName,
volumeToMount.Pod.UID, volumeToMount.Pod.UID,
mountErr) mountErr)
oe.recorder.Eventf(volumeToMount.Pod, api.EventTypeWarning, kevents.FailedMountVolume, err.Error())
return err
} }
glog.Infof( glog.Infof(