diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index 75b4c7f1ec9..ca598e6d086 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -31,6 +31,7 @@ go_library( "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", @@ -46,9 +47,14 @@ go_test( library = ":go_default_library", tags = ["automanaged"], deps = [ + "//pkg/api/v1:go_default_library", "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", ], ) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 1574e991783..3cdf47db891 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -102,12 +103,16 @@ func NewAttachDetachController( // dropped pods so they are continuously processed until it is accepted or // deleted (probably can't do this with sharedInformer), etc. adc := &attachDetachController{ - kubeClient: kubeClient, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvsSynced: pvInformer.Informer().HasSynced, - cloud: cloud, + kubeClient: kubeClient, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvsSynced: pvInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podsSynced: podInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + cloud: cloud, } if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil { @@ -155,14 +160,12 @@ func NewAttachDetachController( UpdateFunc: adc.podUpdate, DeleteFunc: adc.podDelete, }) - adc.podsSynced = podInformer.Informer().HasSynced nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.nodeAdd, UpdateFunc: adc.nodeUpdate, DeleteFunc: adc.nodeDelete, }) - adc.nodesSynced = nodeInformer.Informer().HasSynced return adc, nil } @@ -184,7 +187,10 @@ type attachDetachController struct { pvLister corelisters.PersistentVolumeLister pvsSynced kcache.InformerSynced - podsSynced kcache.InformerSynced + podLister corelisters.PodLister + podsSynced kcache.InformerSynced + + nodeLister corelisters.NodeLister nodesSynced kcache.InformerSynced // cloud provider used by volume host @@ -239,12 +245,136 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { return } + err := adc.populateActualStateOfWorld() + if err != nil { + glog.Errorf("Error populating the actual state of world: %v", err) + } + err = adc.populateDesiredStateOfWorld() + if err != nil { + glog.Errorf("Error populating the desired state of world: %v", err) + } go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) <-stopCh } +func (adc *attachDetachController) populateActualStateOfWorld() error { + glog.V(5).Infof("Populating ActualStateOfworld") + nodes, err := adc.nodeLister.List(labels.Everything()) + if err != nil { + return err + } + + for _, node := range nodes { + nodeName := types.NodeName(node.Name) + for _, attachedVolume := range node.Status.VolumesAttached { + uniqueName := attachedVolume.Name + // The nil VolumeSpec is safe only in the case the volume is not in use by any pod. + // In such a case it should be detached in the first reconciliation cycle and the + // volume spec is not needed to detach a volume. If the volume is used by a pod, it + // its spec can be: this would happen during in the populateDesiredStateOfWorld which + // scans the pods and updates their volumes in the ActualStateOfWorld too. + err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath) + if err != nil { + glog.Errorf("Failed to mark the volume as attached: %v", err) + continue + } + adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, true /* forceUnmount */) + if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists { + // Node specifies annotation indicating it should be managed by + // attach detach controller. Add it to desired state of world. + adc.desiredStateOfWorld.AddNode(types.NodeName(node.Name)) // Needed for DesiredStateOfWorld population + } + } + } + return nil +} + +func (adc *attachDetachController) getNodeVolumeDevicePath( + volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) { + var devicePath string + var found bool + node, err := adc.nodeLister.Get(string(nodeName)) + if err != nil { + return devicePath, err + } + for _, attachedVolume := range node.Status.VolumesAttached { + if volumeName == attachedVolume.Name { + devicePath = attachedVolume.DevicePath + found = true + break + } + } + if !found { + err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName) + } + + return devicePath, err +} + +func (adc *attachDetachController) populateDesiredStateOfWorld() error { + glog.V(5).Infof("Populating DesiredStateOfworld") + + pods, err := adc.podLister.List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods { + podToAdd := pod + adc.podAdd(&podToAdd) + for _, podVolume := range podToAdd.Spec.Volumes { + // The volume specs present in the ActualStateOfWorld are nil, let's replace those + // with the correct ones found on pods. The present in the ASW with no corresponding + // pod will be detached and the spec is irrelevant. + volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister) + if err != nil { + glog.Errorf( + "Error creating spec for volume %q, pod %q/%q: %v", + podVolume.Name, + podToAdd.Namespace, + podToAdd.Name, + err) + continue + } + nodeName := types.NodeName(podToAdd.Spec.NodeName) + plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) + if err != nil || plugin == nil { + glog.V(10).Infof( + "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v", + podVolume.Name, + podToAdd.Namespace, + podToAdd.Name, + err) + continue + } + volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) + if err != nil { + glog.Errorf( + "Failed to find unique name for volume %q, pod %q/%q: %v", + podVolume.Name, + podToAdd.Namespace, + podToAdd.Name, + err) + continue + } + if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) { + devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName) + if err != nil { + glog.Errorf("Failed to find device path: %v", err) + continue + } + err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath) + if err != nil { + glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err) + } + } + } + } + + return nil +} + func (adc *attachDetachController) podAdd(obj interface{}) { pod, ok := obj.(*v1.Pod) if pod == nil || !ok { @@ -308,7 +438,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) { // detach controller. Add it to desired state of world. adc.desiredStateOfWorld.AddNode(nodeName) } - adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) + adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */) } func (adc *attachDetachController) nodeDelete(obj interface{}) { @@ -322,7 +452,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) { glog.V(10).Infof("%v", err) } - adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) + adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */) } // processVolumesInUse processes the list of volumes marked as "in-use" @@ -330,7 +460,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) { // corresponding volume in the actual state of the world to indicate that it is // mounted. func (adc *attachDetachController) processVolumesInUse( - nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) { + nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) { glog.V(4).Infof("processVolumesInUse for node %q", nodeName) for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { mounted := false @@ -341,7 +471,7 @@ func (adc *attachDetachController) processVolumesInUse( } } err := adc.actualStateOfWorld.SetVolumeMountedByNode( - attachedVolume.VolumeName, nodeName, mounted) + attachedVolume.VolumeName, nodeName, mounted, forceUnmount) if err != nil { glog.Warningf( "SetVolumeMountedByNode(%q, %q, %q) returned an error: %v", diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 4a7a8ebfd25..a1a2266d651 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -17,11 +17,17 @@ limitations under the License. package attachdetach import ( + "fmt" "testing" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/api/v1" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" ) @@ -47,3 +53,231 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) } } + +func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) { + // Arrange + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + nodeInformer := informerFactory.Core().V1().Nodes() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + + adc := &attachDetachController{ + kubeClient: fakeKubeClient, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvsSynced: pvInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podsSynced: podInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + cloud: nil, + } + + // Act + plugins := controllervolumetesting.CreateTestPlugin() + + if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil { + t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) + } + + adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) + adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) + + err := adc.populateActualStateOfWorld() + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } + + err = adc.populateDesiredStateOfWorld() + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: %v", err) + } + + // Test the ActualStateOfWorld contains all the node volumes + nodes, err := adc.nodeLister.List(labels.Everything()) + for _, node := range nodes { + nodeName := types.NodeName(node.Name) + for _, attachedVolume := range node.Status.VolumesAttached { + found := adc.actualStateOfWorld.VolumeNodeExists(attachedVolume.Name, nodeName) + if !found { + t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name) + } + } + } + + pods, err := adc.podLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: %v", err) + } + for _, pod := range pods { + uniqueName := fmt.Sprintf("%s/%s", controllervolumetesting.TestPluginName, pod.Spec.Volumes[0].Name) + nodeName := types.NodeName(pod.Spec.NodeName) + found := adc.desiredStateOfWorld.VolumeExists(v1.UniqueVolumeName(uniqueName), nodeName) + if !found { + t.Fatalf("Run failed with error. Volume %s, node %s not found in DesiredStateOfWorld", + pod.Spec.Volumes[0].Name, + pod.Spec.NodeName) + } + } +} + +func Test_AttachDetachControllerRecovery(t *testing.T) { + attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{}) + newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1") + attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{}) + newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1") + attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{newPod1}) + newPod1 = controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1") + newPod2 := controllervolumetesting.NewPodWithVolume("newpod-2", "volumeName3", "mynode-1") + attachDetachRecoveryTestCase(t, []*v1.Pod{newPod1}, []*v1.Pod{newPod2}) +} + +func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 []*v1.Pod) { + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1) + //informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1) + plugins := controllervolumetesting.CreateTestPlugin() + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + podInformer := informerFactory.Core().V1().Pods().Informer() + var podsNum, extraPodsNum, nodesNum, i int + + stopCh := make(chan struct{}) + + pods, err := fakeKubeClient.Core().Pods(v1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: %v", err) + } + + for _, pod := range pods.Items { + podToAdd := pod + podInformer.GetIndexer().Add(&podToAdd) + podsNum++ + } + nodes, err := fakeKubeClient.Core().Nodes().List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: %v", err) + } + for _, node := range nodes.Items { + nodeToAdd := node + nodeInformer.GetIndexer().Add(&nodeToAdd) + nodesNum++ + } + + informerFactory.Start(stopCh) + + // Make sure the nodes and pods are in the inforer cache + i = 0 + nodeList, err := informerFactory.Core().V1().Nodes().Lister().List(labels.Everything()) + for len(nodeList) < nodesNum { + if err != nil { + t.Fatalf("Error getting list of nodes %v", err) + } + if i > 100 { + t.Fatalf("Time out while waiting for the node informer sync: found %d nodes, expected %d nodes", len(nodeList), nodesNum) + } + time.Sleep(100 * time.Millisecond) + nodeList, err = informerFactory.Core().V1().Nodes().Lister().List(labels.Everything()) + i++ + } + i = 0 + podList, err := informerFactory.Core().V1().Pods().Lister().List(labels.Everything()) + for len(podList) < podsNum { + if err != nil { + t.Fatalf("Error getting list of nodes %v", err) + } + if i > 100 { + t.Fatalf("Time out while waiting for the pod informer sync: found %d pods, expected %d pods", len(podList), podsNum) + } + time.Sleep(100 * time.Millisecond) + podList, err = informerFactory.Core().V1().Pods().Lister().List(labels.Everything()) + i++ + } + + // Create the controller + adcObj, err := NewAttachDetachController( + fakeKubeClient, + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().PersistentVolumes(), + nil, /* cloud */ + plugins, + false, + time.Second*1) + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } + + adc := adcObj.(*attachDetachController) + + // Populate ASW + err = adc.populateActualStateOfWorld() + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } + + for _, newPod := range extraPods1 { + // Add a new pod between ASW and DSW ppoulators + _, err = adc.kubeClient.Core().Pods(newPod.ObjectMeta.Namespace).Create(newPod) + if err != nil { + t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err) + } + extraPodsNum++ + podInformer.GetIndexer().Add(newPod) + + } + + // Populate DSW + err = adc.populateDesiredStateOfWorld() + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: %v", err) + } + + for _, newPod := range extraPods2 { + // Add a new pod between DSW ppoulator and reconciler run + _, err = adc.kubeClient.Core().Pods(newPod.ObjectMeta.Namespace).Create(newPod) + if err != nil { + t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err) + } + extraPodsNum++ + podInformer.GetIndexer().Add(newPod) + } + + go adc.reconciler.Run(stopCh) + go adc.desiredStateOfWorldPopulator.Run(stopCh) + defer close(stopCh) + + time.Sleep(time.Second * 1) // Wait so the reconciler calls sync at least once + + testPlugin := plugins[0].(*controllervolumetesting.TestPlugin) + for i = 0; i <= 10; i++ { + var attachedVolumesNum int = 0 + var detachedVolumesNum int = 0 + + time.Sleep(time.Second * 1) // Wait for a second + for _, volumeList := range testPlugin.GetAttachedVolumes() { + attachedVolumesNum += len(volumeList) + } + for _, volumeList := range testPlugin.GetDetachedVolumes() { + detachedVolumesNum += len(volumeList) + } + + // All the "extra pods" should result in volume to be attached, the pods all share one volume + // which should be attached (+1), the volumes found only in the nodes status should be detached + if attachedVolumesNum == 1+extraPodsNum && detachedVolumesNum == nodesNum { + break + } + if i == 10 { // 10 seconds time out + t.Fatalf("Waiting for the volumes to attach/detach timed out: attached %d (expected %d); detached %d (%d)", + attachedVolumesNum, 1+extraPodsNum, detachedVolumesNum, nodesNum) + } + } + + if testPlugin.GetErrorEncountered() { + t.Fatalf("Fatal error encountered in the testing volume plugin") + } + +} diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 5387bec0d9d..db405298221 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -56,16 +56,19 @@ type ActualStateOfWorld interface { // added. // If no node with the name nodeName exists in list of attached nodes for // the specified volume, the node is added. - AddVolumeNode(volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) + AddVolumeNode(uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) // SetVolumeMountedByNode sets the MountedByNode value for the given volume - // and node. When set to true this value indicates the volume is mounted by - // the given node, indicating it may not be safe to detach. + // and node. When set to true the mounted parameter indicates the volume + // is mounted by the given node, indicating it may not be safe to detach. + // If the forceUnmount is set to true the MountedByNode value would be reset + // to false even it was not set yet (this is required during a controller + // crash recovery). // If no volume with the name volumeName exists in the store, an error is // returned. // If no node with the name nodeName exists in list of attached nodes for // the specified volume, an error is returned. - SetVolumeMountedByNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error + SetVolumeMountedByNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool, forceUnmount bool) error // SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified // node to true indicating the AttachedVolume field in the Node's Status @@ -230,8 +233,8 @@ type nodeToUpdateStatusFor struct { } func (asw *actualStateOfWorld) MarkVolumeAsAttached( - _ v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error { - _, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error { + _, err := asw.AddVolumeNode(uniqueName, volumeSpec, nodeName, devicePath) return err } @@ -255,25 +258,34 @@ func (asw *actualStateOfWorld) AddVolumeToReportAsAttached( } func (asw *actualStateOfWorld) AddVolumeNode( - volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) { + uniqueName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) { asw.Lock() defer asw.Unlock() - attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) - if err != nil || attachableVolumePlugin == nil { - return "", fmt.Errorf( - "failed to get AttachablePlugin from volumeSpec for volume %q err=%v", - volumeSpec.Name(), - err) - } + var volumeName v1.UniqueVolumeName + if volumeSpec != nil { + attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) + if err != nil || attachableVolumePlugin == nil { + return "", fmt.Errorf( + "failed to get AttachablePlugin from volumeSpec for volume %q err=%v", + volumeSpec.Name(), + err) + } - volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec( - attachableVolumePlugin, volumeSpec) - if err != nil { - return "", fmt.Errorf( - "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v", - volumeSpec.Name(), - err) + volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec( + attachableVolumePlugin, volumeSpec) + if err != nil { + return "", fmt.Errorf( + "failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v", + volumeSpec.Name(), + err) + } + } else { + // volumeSpec is nil + // This happens only on controller startup when reading the volumes from node + // status; if the pods using the volume have been removed and are unreachable + // the volumes should be detached immediately and the spec is not needed + volumeName = uniqueName } volumeObj, volumeExists := asw.attachedVolumes[volumeName] @@ -316,7 +328,7 @@ func (asw *actualStateOfWorld) AddVolumeNode( } func (asw *actualStateOfWorld) SetVolumeMountedByNode( - volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error { + volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool, forceUnmount bool) error { asw.Lock() defer asw.Unlock() @@ -330,7 +342,7 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode( nodeObj.mountedByNodeSetCount = nodeObj.mountedByNodeSetCount + 1 } else { // Do not allow value to be reset unless it has been set at least once - if nodeObj.mountedByNodeSetCount == 0 { + if nodeObj.mountedByNodeSetCount == 0 && !forceUnmount { return nil } } diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go index 86f04614933..fa19728b335 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go @@ -39,7 +39,7 @@ func Test_AddVolumeNode_Positive_NewVolumeNewNode(t *testing.T) { devicePath := "fake/device/path" // Act - generatedVolumeName, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) // Assert if err != nil { @@ -72,8 +72,8 @@ func Test_AddVolumeNode_Positive_ExistingVolumeNewNode(t *testing.T) { devicePath := "fake/device/path" // Act - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name, devicePath) - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeName, volumeSpec, node2Name, devicePath) // Assert if add1Err != nil { @@ -121,8 +121,8 @@ func Test_AddVolumeNode_Positive_ExistingVolumeExistingNode(t *testing.T) { devicePath := "fake/device/path" // Act - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) // Assert if add1Err != nil { @@ -163,7 +163,7 @@ func Test_DeleteVolumeNode_Positive_VolumeExistsNodeExists(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -220,11 +220,11 @@ func Test_DeleteVolumeNode_Positive_TwoNodesOneDeleted(t *testing.T) { node1Name := types.NodeName("node1-name") node2Name := types.NodeName("node2-name") devicePath := "fake/device/path" - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeName, volumeSpec, node2Name, devicePath) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } @@ -268,7 +268,7 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeExists(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -301,7 +301,7 @@ func Test_VolumeNodeExists_Positive_VolumeExistsNodeDoesntExist(t *testing.T) { node1Name := types.NodeName("node1-name") node2Name := types.NodeName("node2-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, node1Name, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -372,7 +372,7 @@ func Test_GetAttachedVolumes_Positive_OneVolumeOneNode(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -399,14 +399,14 @@ func Test_GetAttachedVolumes_Positive_TwoVolumeTwoNodes(t *testing.T) { volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) node1Name := types.NodeName("node1-name") devicePath := "fake/device/path" - generatedVolumeName1, add1Err := asw.AddVolumeNode(volume1Spec, node1Name, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volume1Name, volume1Spec, node1Name, devicePath) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } volume2Name := v1.UniqueVolumeName("volume2-name") volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) node2Name := types.NodeName("node2-name") - generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Spec, node2Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Name, volume2Spec, node2Name, devicePath) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } @@ -434,12 +434,12 @@ func Test_GetAttachedVolumes_Positive_OneVolumeTwoNodes(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) node1Name := types.NodeName("node1-name") devicePath := "fake/device/path" - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } node2Name := types.NodeName("node2-name") - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(v1.UniqueVolumeName(""), volumeSpec, node2Name, devicePath) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } @@ -473,7 +473,7 @@ func Test_SetVolumeMountedByNode_Positive_Set(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -500,14 +500,14 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } // Act - setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false /* force unmount */) + setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false /* force unmount */) // Assert if setVolumeMountedErr1 != nil { @@ -536,13 +536,13 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } // Act - setVolumeMountedErr := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + setVolumeMountedErr := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false /* force unmount */) // Assert if setVolumeMountedErr != nil { @@ -569,15 +569,15 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } // Act - setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) - generatedVolumeName, addErr = asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false /* force unmount */) + setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false /* force unmount */) + generatedVolumeName, addErr = asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) // Assert if setVolumeMountedErr1 != nil { @@ -610,7 +610,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -625,8 +625,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime // Act - setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false /* force unmount */) + setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false /* force unmount */) // Assert if setVolumeMountedErr1 != nil { @@ -657,7 +657,7 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_Set(t *testing.T) { devicePath := "fake/device/path" volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -684,7 +684,7 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_Marked(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -720,7 +720,7 @@ func Test_MarkDesireToDetach_Positive_MarkedAddVolumeNodeReset(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -763,12 +763,12 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMou volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } - setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false /* force unmount */) + setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false /* force unmount */) if setVolumeMountedErr1 != nil { t.Fatalf("SetVolumeMountedByNode1 failed. Expected Actual: <%v>", setVolumeMountedErr1) } @@ -806,7 +806,7 @@ func Test_RemoveVolumeFromReportAsAttached(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -839,7 +839,7 @@ func Test_RemoveVolumeFromReportAsAttached_AddVolumeToReportAsAttached_Positive( volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -882,7 +882,7 @@ func Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -903,7 +903,7 @@ func Test_RemoveVolumeFromReportAsAttached_Delete_AddVolumeNode(t *testing.T) { asw.DeleteVolumeNode(generatedVolumeName, nodeName) - asw.AddVolumeNode(volumeSpec, nodeName, "" /*device path*/) + asw.AddVolumeNode(volumeName, volumeSpec, nodeName, "" /*device path*/) reportAsAttachedVolumesMap = asw.GetVolumesToReportAttached() volumes, exists = reportAsAttachedVolumesMap[nodeName] @@ -927,7 +927,7 @@ func Test_SetDetachRequestTime_Positive(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -974,7 +974,7 @@ func Test_GetAttachedVolumesForNode_Positive_OneVolumeOneNode(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) nodeName := types.NodeName("node-name") devicePath := "fake/device/path" - generatedVolumeName, addErr := asw.AddVolumeNode(volumeSpec, nodeName, devicePath) + generatedVolumeName, addErr := asw.AddVolumeNode(volumeName, volumeSpec, nodeName, devicePath) if addErr != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", addErr) } @@ -998,14 +998,14 @@ func Test_GetAttachedVolumesForNode_Positive_TwoVolumeTwoNodes(t *testing.T) { volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name) node1Name := types.NodeName("node1-name") devicePath := "fake/device/path" - _, add1Err := asw.AddVolumeNode(volume1Spec, node1Name, devicePath) + _, add1Err := asw.AddVolumeNode(volume1Name, volume1Spec, node1Name, devicePath) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } volume2Name := v1.UniqueVolumeName("volume2-name") volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name) node2Name := types.NodeName("node2-name") - generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Spec, node2Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(volume2Name, volume2Spec, node2Name, devicePath) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } @@ -1029,12 +1029,12 @@ func Test_GetAttachedVolumesForNode_Positive_OneVolumeTwoNodes(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) node1Name := types.NodeName("node1-name") devicePath := "fake/device/path" - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name, devicePath) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } node2Name := types.NodeName("node2-name") - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name, devicePath) + generatedVolumeName2, add2Err := asw.AddVolumeNode(v1.UniqueVolumeName(""), volumeSpec, node2Name, devicePath) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } @@ -1065,13 +1065,13 @@ func Test_OneVolumeTwoNodes_TwoDevicePaths(t *testing.T) { volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) node1Name := types.NodeName("node1-name") devicePath1 := "fake/device/path1" - generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeSpec, node1Name, devicePath1) + generatedVolumeName1, add1Err := asw.AddVolumeNode(volumeName, volumeSpec, node1Name, devicePath1) if add1Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add1Err) } node2Name := types.NodeName("node2-name") devicePath2 := "fake/device/path2" - generatedVolumeName2, add2Err := asw.AddVolumeNode(volumeSpec, node2Name, devicePath2) + generatedVolumeName2, add2Err := asw.AddVolumeNode(v1.UniqueVolumeName(""), volumeSpec, node2Name, devicePath2) if add2Err != nil { t.Fatalf("AddVolumeNode failed. Expected: Actual: <%v>", add2Err) } diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 505e11e0718..08ce7effc18 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -184,9 +184,8 @@ func (rc *reconciler) reconcile() { // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf( - "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", + "operationExecutor.DetachVolume failed to start for volume %q from node %q with err: %v", attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), attachedVolume.NodeName, err) } diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 79110725579..baf67d9ca71 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -170,8 +170,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te generatedVolumeName, nodeName) } - asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false) + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false) // Assert waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) @@ -302,8 +302,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate generatedVolumeName, nodeName) } - asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) - asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false) + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false) // Assert verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) diff --git a/pkg/controller/volume/attachdetach/testing/BUILD b/pkg/controller/volume/attachdetach/testing/BUILD index 1bcc08aafa6..80a90828cf8 100644 --- a/pkg/controller/volume/attachdetach/testing/BUILD +++ b/pkg/controller/volume/attachdetach/testing/BUILD @@ -15,6 +15,8 @@ go_library( "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/util/volumehelper:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/controller/volume/attachdetach/testing/testvolumespec.go b/pkg/controller/volume/attachdetach/testing/testvolumespec.go index b484cfa8ced..2b954e6b79a 100644 --- a/pkg/controller/volume/attachdetach/testing/testvolumespec.go +++ b/pkg/controller/volume/attachdetach/testing/testvolumespec.go @@ -18,7 +18,10 @@ package testing import ( "fmt" + "sync" + "time" + "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -27,8 +30,11 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) +const TestPluginName = "kubernetes.io/testPlugin" + // GetTestVolumeSpec returns a test volume spec func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume.Spec { return &volume.Spec{ @@ -45,9 +51,12 @@ func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume. } } +var extraPods *v1.PodList + func CreateTestClient() *fake.Clientset { fakeClient := &fake.Clientset{} + extraPods = &v1.PodList{} fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { obj := &v1.PodList{} podNamePrefix := "mypod" @@ -60,6 +69,7 @@ func CreateTestClient() *fake.Clientset { }, ObjectMeta: metav1.ObjectMeta{ Name: podName, + UID: types.UID(podName), Namespace: namespace, Labels: map[string]string{ "name": podName, @@ -91,10 +101,55 @@ func CreateTestClient() *fake.Clientset { }, }, }, + NodeName: "mynode", }, } obj.Items = append(obj.Items, pod) } + for _, pod := range extraPods.Items { + obj.Items = append(obj.Items, pod) + } + return true, obj, nil + }) + fakeClient.AddReactor("create", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(core.CreateAction) + pod := createAction.GetObject().(*v1.Pod) + extraPods.Items = append(extraPods.Items, *pod) + return true, createAction.GetObject(), nil + }) + fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &v1.NodeList{} + nodeNamePrefix := "mynode" + for i := 0; i < 5; i++ { + var nodeName string + if i != 0 { + nodeName = fmt.Sprintf("%s-%d", nodeNamePrefix, i) + } else { + // We want also the "mynode" node since all the testing pods live there + nodeName = nodeNamePrefix + } + node := v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: map[string]string{ + "name": nodeName, + }, + Annotations: map[string]string{ + volumehelper.ControllerManagedAttachAnnotation: "true", + }, + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: TestPluginName + "/lostVolumeName", + DevicePath: "fake/path", + }, + }, + }, + Spec: v1.NodeSpec{ExternalID: string(nodeName)}, + } + obj.Items = append(obj.Items, node) + } return true, obj, nil }) @@ -114,3 +169,254 @@ func NewPod(uid, name string) *v1.Pod { }, } } + +// NewPod returns a test pod object +func NewPodWithVolume(podName, volumeName, nodeName string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(podName), + Name: podName, + Namespace: "mynamespace", + Labels: map[string]string{ + "name": podName, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "containerName", + Image: "containerImage", + VolumeMounts: []v1.VolumeMount{ + { + Name: "volumeMountName", + ReadOnly: false, + MountPath: "/mnt", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: volumeName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "pdName", + FSType: "ext4", + ReadOnly: false, + }, + }, + }, + }, + NodeName: nodeName, + }, + } +} + +type TestPlugin struct { + ErrorEncountered bool + attachedVolumeMap map[string][]string + detachedVolumeMap map[string][]string + pluginLock *sync.RWMutex +} + +func (plugin *TestPlugin) Init(host volume.VolumeHost) error { + return nil +} + +func (plugin *TestPlugin) GetPluginName() string { + return TestPluginName +} + +func (plugin *TestPlugin) GetVolumeName(spec *volume.Spec) (string, error) { + plugin.pluginLock.Lock() + defer plugin.pluginLock.Unlock() + if spec == nil { + glog.Errorf("GetVolumeName called with nil volume spec") + plugin.ErrorEncountered = true + } + return spec.Name(), nil +} + +func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool { + plugin.pluginLock.Lock() + defer plugin.pluginLock.Unlock() + if spec == nil { + glog.Errorf("CanSupport called with nil volume spec") + plugin.ErrorEncountered = true + } + return true +} + +func (plugin *TestPlugin) RequiresRemount() bool { + return false +} + +func (plugin *TestPlugin) NewMounter(spec *volume.Spec, podRef *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { + plugin.pluginLock.Lock() + defer plugin.pluginLock.Unlock() + if spec == nil { + glog.Errorf("NewMounter called with nil volume spec") + plugin.ErrorEncountered = true + } + return nil, nil +} + +func (plugin *TestPlugin) NewUnmounter(name string, podUID types.UID) (volume.Unmounter, error) { + return nil, nil +} + +func (plugin *TestPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { + fakeVolume := &v1.Volume{ + Name: volumeName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "pdName", + FSType: "ext4", + ReadOnly: false, + }, + }, + } + return volume.NewSpecFromVolume(fakeVolume), nil +} + +func (plugin *TestPlugin) NewAttacher() (volume.Attacher, error) { + attacher := testPluginAttacher{ + ErrorEncountered: &plugin.ErrorEncountered, + attachedVolumeMap: plugin.attachedVolumeMap, + pluginLock: plugin.pluginLock, + } + return &attacher, nil +} + +func (plugin *TestPlugin) NewDetacher() (volume.Detacher, error) { + detacher := testPluginDetacher{ + detachedVolumeMap: plugin.detachedVolumeMap, + pluginLock: plugin.pluginLock, + } + return &detacher, nil +} + +func (plugin *TestPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + return []string{}, nil +} + +func (plugin *TestPlugin) SupportsMountOption() bool { + return false +} + +func (plugin *TestPlugin) SupportsBulkVolumeVerification() bool { + return false +} + +func (plugin *TestPlugin) GetErrorEncountered() bool { + plugin.pluginLock.RLock() + defer plugin.pluginLock.RUnlock() + return plugin.ErrorEncountered +} + +func (plugin *TestPlugin) GetAttachedVolumes() map[string][]string { + plugin.pluginLock.RLock() + defer plugin.pluginLock.RUnlock() + ret := make(map[string][]string) + for nodeName, volumeList := range plugin.attachedVolumeMap { + ret[nodeName] = make([]string, len(volumeList)) + copy(ret[nodeName], volumeList) + } + return ret +} + +func (plugin *TestPlugin) GetDetachedVolumes() map[string][]string { + plugin.pluginLock.RLock() + defer plugin.pluginLock.RUnlock() + ret := make(map[string][]string) + for nodeName, volumeList := range plugin.detachedVolumeMap { + ret[nodeName] = make([]string, len(volumeList)) + copy(ret[nodeName], volumeList) + } + return ret +} + +func CreateTestPlugin() []volume.VolumePlugin { + attachedVolumes := make(map[string][]string) + detachedVolumes := make(map[string][]string) + return []volume.VolumePlugin{&TestPlugin{ + ErrorEncountered: false, + attachedVolumeMap: attachedVolumes, + detachedVolumeMap: detachedVolumes, + pluginLock: &sync.RWMutex{}, + }} +} + +// Attacher +type testPluginAttacher struct { + ErrorEncountered *bool + attachedVolumeMap map[string][]string + pluginLock *sync.RWMutex +} + +func (attacher *testPluginAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) { + attacher.pluginLock.Lock() + defer attacher.pluginLock.Unlock() + if spec == nil { + *attacher.ErrorEncountered = true + glog.Errorf("Attach called with nil volume spec") + return "", fmt.Errorf("Attach called with nil volume spec") + } + attacher.attachedVolumeMap[string(nodeName)] = append(attacher.attachedVolumeMap[string(nodeName)], spec.Name()) + return spec.Name(), nil +} + +func (attacher *testPluginAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + return nil, nil +} + +func (attacher *testPluginAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { + attacher.pluginLock.Lock() + defer attacher.pluginLock.Unlock() + if spec == nil { + *attacher.ErrorEncountered = true + glog.Errorf("WaitForAttach called with nil volume spec") + return "", fmt.Errorf("WaitForAttach called with nil volume spec") + } + fakePath := fmt.Sprintf("%s/%s", devicePath, spec.Name()) + return fakePath, nil +} + +func (attacher *testPluginAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { + attacher.pluginLock.Lock() + defer attacher.pluginLock.Unlock() + if spec == nil { + *attacher.ErrorEncountered = true + glog.Errorf("GetDeviceMountPath called with nil volume spec") + return "", fmt.Errorf("GetDeviceMountPath called with nil volume spec") + } + return "", nil +} + +func (attacher *testPluginAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { + attacher.pluginLock.Lock() + defer attacher.pluginLock.Unlock() + if spec == nil { + *attacher.ErrorEncountered = true + glog.Errorf("MountDevice called with nil volume spec") + return fmt.Errorf("MountDevice called with nil volume spec") + } + return nil +} + +// Detacher +type testPluginDetacher struct { + detachedVolumeMap map[string][]string + pluginLock *sync.RWMutex +} + +func (detacher *testPluginDetacher) Detach(volumeName string, nodeName types.NodeName) error { + detacher.pluginLock.Lock() + defer detacher.pluginLock.Unlock() + detacher.detachedVolumeMap[string(nodeName)] = append(detacher.detachedVolumeMap[string(nodeName)], volumeName) + return nil +} + +func (detacher *testPluginDetacher) UnmountDevice(deviceMountPath string) error { + return nil +} diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index b78c76d2f9c..89b29be2a55 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -68,9 +68,10 @@ type VolumePlugin interface { // depend on this. Init(host VolumeHost) error - // Name returns the plugin's name. Plugins should use namespaced names - // such as "example.com/volume". The "kubernetes.io" namespace is - // reserved for plugins which are bundled with kubernetes. + // Name returns the plugin's name. Plugins must use namespaced names + // such as "example.com/volume" and contain exactly one '/' character. + // The "kubernetes.io" namespace is reserved for plugins which are + // bundled with kubernetes. GetPluginName() string // GetVolumeName returns the name/ID to uniquely identifying the actual diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 8e28405786b..f8ae260244a 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -403,6 +403,10 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( for node, nodeAttachedVolumes := range attachedVolumes { for _, volumeAttached := range nodeAttachedVolumes { + if volumeAttached.VolumeSpec == nil { + glog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName) + continue + } volumePlugin, err := oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 985c7c21730..8b2b0f17f98 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -30,6 +30,7 @@ import ( kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) var _ OperationGenerator = &operationGenerator{} @@ -314,35 +315,48 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - // Get attacher plugin - attachableVolumePlugin, err := - og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) - if err != nil || attachableVolumePlugin == nil { - return nil, fmt.Errorf( - "DetachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", - volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), - volumeToDetach.NodeName, - err) - } + var volumeName string + var attachableVolumePlugin volume.AttachableVolumePlugin + var err error - volumeName, err := - attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) - if err != nil { - return nil, fmt.Errorf( - "DetachVolume.GetVolumeName failed for volume %q (spec.Name: %q) from node %q with: %v", - volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), - volumeToDetach.NodeName, - err) - } + if volumeToDetach.VolumeSpec != nil { + // Get attacher plugin + attachableVolumePlugin, err = + og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) + if err != nil || attachableVolumePlugin == nil { + return nil, fmt.Errorf( + "DetachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", + volumeToDetach.VolumeName, + volumeToDetach.VolumeSpec.Name(), + volumeToDetach.NodeName, + err) + } + volumeName, err = + attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) + } else { + var pluginName string + // Get attacher plugin and the volumeName by splitting the volume unique name in case + // there's no VolumeSpec: this happens only on attach/detach controller crash recovery + // when a pod has been deleted during the controller downtime + pluginName, volumeName, err = volumehelper.SplitUniqueName(volumeToDetach.VolumeName) + if err != nil { + return nil, err + } + attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) + if err != nil { + return nil, fmt.Errorf( + "Failed to find plugin for volume %q from node %q with: %v", + volumeToDetach.VolumeName, + volumeToDetach.NodeName, + err) + } + } volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { return nil, fmt.Errorf( - "DetachVolume.NewDetacher failed for volume %q (spec.Name: %q) from node %q with: %v", + "DetachVolume.NewDetacher failed for volume %q from node %q with: %v", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName, err) } @@ -360,17 +374,15 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( actualStateOfWorld.AddVolumeToReportAsAttached( volumeToDetach.VolumeName, volumeToDetach.NodeName) return fmt.Errorf( - "DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v", + "DetachVolume.Detach failed for volume %q from node %q with: %v", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName, err) } glog.Infof( - "DetachVolume.Detach succeeded for volume %q (spec.Name: %q) from node %q.", + "DetachVolume.Detach succeeded for volume %q from node %q.", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName) // Update actual state of world @@ -865,16 +877,14 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( if errors.IsNotFound(fetchErr) { glog.Warningf("Node %q not found on API server. DetachVolume will skip safe to detach check.", volumeToDetach.NodeName, - volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name()) + volumeToDetach.VolumeName) return nil } // On failure, return error. Caller will log and retry. return fmt.Errorf( - "DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v", + "DetachVolume failed fetching node from API server for volume %q from node %q with: %v", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName, fetchErr) } @@ -882,25 +892,22 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( if node == nil { // On failure, return error. Caller will log and retry. return fmt.Errorf( - "DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil", + "DetachVolume failed fetching node from API server for volume %q from node %q. Error: node object retrieved from API server is nil", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName) } for _, inUseVolume := range node.Status.VolumesInUse { if inUseVolume == volumeToDetach.VolumeName { - return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status", + return fmt.Errorf("DetachVolume failed for volume %q from node %q. Error: volume is still in use by node, according to Node status", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName) } } // Volume is not marked as in use by node - glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.", + glog.Infof("Verified volume is safe to detach for volume %q from node %q.", volumeToDetach.VolumeName, - volumeToDetach.VolumeSpec.Name(), volumeToDetach.NodeName) return nil } diff --git a/pkg/volume/util/volumehelper/volumehelper.go b/pkg/volume/util/volumehelper/volumehelper.go index c55c8db60ec..d4dd45dfe3e 100644 --- a/pkg/volume/util/volumehelper/volumehelper.go +++ b/pkg/volume/util/volumehelper/volumehelper.go @@ -20,6 +20,7 @@ package volumehelper import ( "fmt" + "strings" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/volume" @@ -103,3 +104,18 @@ func notRunning(statuses []v1.ContainerStatus) bool { } return true } + +// SplitUniqueName splits the unique name to plugin name and volume name strings. It expects the uniqueName to follow +// the fromat plugin_name/volume_name and the plugin name must be namespaced as descibed by the plugin interface, +// i.e. namespace/plugin containing exactly one '/'. This means the unique name will always be in the form of +// plugin_namespace/plugin/volume_name, see k8s.io/kubernetes/pkg/volume/plugins.go VolumePlugin interface +// description and pkg/volume/util/volumehelper/volumehelper.go GetUniqueVolumeNameFromSpec that constructs +// the unique volume names. +func SplitUniqueName(uniqueName v1.UniqueVolumeName) (string, string, error) { + components := strings.SplitN(string(uniqueName), "/", 3) + if len(components) != 3 { + return "", "", fmt.Errorf("cannot split volume unique name %s to plugin/volume components", uniqueName) + } + pluginName := fmt.Sprintf("%s/%s", components[0], components[1]) + return pluginName, components[2], nil +}