diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index bd1fdc55637..8b46d3c3e72 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -14,6 +14,55 @@ See the License for the specific language governing permissions and limitations under the License. */ +/* +This file defines block volume related methods for CSI driver. +CSI driver is responsible for staging/publishing volumes to their staging/publish paths. +Mapping and unmapping of a device in a publish path to its global map path and its +pod device map path are done by operation_executor through MapBlockVolume/UnmapBlockVolume +(MapBlockVolume and UnmapBlockVolume take care for lock, symlink, and bind mount). + +Summary of block volume related CSI driver's methods are as follows: + - GetGlobalMapPath returns a global map path, + - GetPodDeviceMapPath returns a pod device map path and filename, + - SetUpDevice calls CSI's NodeStageVolume and stage a volume to its staging path, + - MapPodDevice calls CSI's NodePublishVolume and publish a volume to its publish path, + - UnmapPodDevice calls CSI's NodeUnpublishVolume and unpublish a volume from its publish path, + - TearDownDevice calls CSI's NodeUnstageVolume and unstage a volume from its staging path. + +These methods are called by below sequences: + - operation_executor.MountVolume + - csi.GetGlobalMapPath + - csi.SetupDevice + - NodeStageVolume + - ASW.MarkDeviceAsMounted + - csi.GetPodDeviceMapPath + - csi.MapPodDevice + - NodePublishVolume + - util.MapBlockVolume + - ASW.MarkVolumeAsMounted + + - operation_executor.UnmountVolume + - csi.GetPodDeviceMapPath + - util.UnmapBlockVolume + - csi.UnmapPodDevice + - NodeUnpublishVolume + - ASW.MarkVolumeAsUnmounted + + - operation_executor.UnmountDevice + - csi.TearDownDevice + - NodeUnstageVolume + - ASW.MarkDeviceAsUnmounted + +After successful MountVolume for block volume, directory structure will be like below: + /dev/loopX ... Descriptor lock(Loopback device to mapFile under global map path) + /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/ ... Global map path + /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/{podUID} ... MapFile(Bind mount to publish Path) + /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging/{specName} ... Staging path + /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} ... Publish path + /var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/ ... Pod device map path + /var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/{specName} ... MapFile(Symlink to publish path) +*/ + package csi import ( @@ -71,7 +120,7 @@ func (m *csiBlockMapper) getPublishPath() string { } // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume -// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {specName} +// returns: pods/{podUID}/volumeDevices/kubernetes.io~csi, {specName} func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) { path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName)) klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName)) @@ -147,7 +196,6 @@ func (m *csiBlockMapper) publishVolumeForBlock( accessMode v1.PersistentVolumeAccessMode, csiSource *v1.CSIPersistentVolumeSource, attachment *storage.VolumeAttachment, - stagingPath string, ) (string, error) { klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called")) @@ -183,7 +231,7 @@ func (m *csiBlockMapper) publishVolumeForBlock( ctx, m.volumeID, m.readOnly, - stagingPath, + m.getStagingPath(), publishPath, accessMode, publishVolumeInfo, @@ -249,13 +297,7 @@ func (m *csiBlockMapper) SetUpDevice() error { } // Call NodeStageVolume - stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) - if err != nil { - return err - } - - // Call NodePublishVolume - _, err = m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath) + _, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) if err != nil { return err } @@ -264,7 +306,59 @@ func (m *csiBlockMapper) SetUpDevice() error { } func (m *csiBlockMapper) MapPodDevice() (string, error) { - return m.getPublishPath(), nil + if !m.plugin.blockEnabled { + return "", errors.New("CSIBlockVolume feature not enabled") + } + klog.V(4).Infof(log("blockMapper.MapPodDevice called")) + + // Get csiSource from spec + if m.spec == nil { + return "", errors.New(log("blockMapper.MapPodDevice spec is nil")) + } + + csiSource, err := getCSISourceFromSpec(m.spec) + if err != nil { + return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err)) + } + + driverName := csiSource.Driver + skip, err := m.plugin.skipAttach(driverName) + if err != nil { + return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err)) + } + + var attachment *storage.VolumeAttachment + if !skip { + // Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName + nodeName := string(m.plugin.host.GetNodeName()) + attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName) + attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err)) + } + } + + //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI + accessMode := v1.ReadWriteOnce + if m.spec.PersistentVolume.Spec.AccessModes != nil { + accessMode = m.spec.PersistentVolume.Spec.AccessModes[0] + } + + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + csiClient, err := m.csiClientGetter.Get() + if err != nil { + return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI client: %v", err)) + } + + // Call NodePublishVolume + publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) + if err != nil { + return "", err + } + + return publishPath, nil } var _ volume.BlockVolumeUnmapper = &csiBlockMapper{} @@ -319,8 +413,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return errors.New("CSIBlockVolume feature not enabled") } - klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath)) - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() @@ -329,21 +421,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error return errors.New(log("blockMapper.TearDownDevice failed to get CSI client: %v", err)) } - // Call NodeUnpublishVolume - publishPath := m.getPublishPath() - if _, err := os.Stat(publishPath); err != nil { - if os.IsNotExist(err) { - klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) - } else { - return err - } - } else { - err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath) - if err != nil { - return err - } - } - // Call NodeUnstageVolume stagingPath := m.getStagingPath() if _, err := os.Stat(stagingPath); err != nil { @@ -364,5 +441,32 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error // UnmapPodDevice unmaps the block device path. func (m *csiBlockMapper) UnmapPodDevice() error { + if !m.plugin.blockEnabled { + return errors.New("CSIBlockVolume feature not enabled") + } + publishPath := m.getPublishPath() + + csiClient, err := m.csiClientGetter.Get() + if err != nil { + return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err)) + } + + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + // Call NodeUnpublishVolume + if _, err := os.Stat(publishPath); err != nil { + if os.IsNotExist(err) { + klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath)) + } else { + return err + } + } else { + err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 232b6d96be2..61e76293d35 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -49,6 +49,40 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T return csiMapper, spec, pv, nil } +func prepareBlockUnmapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) { + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + pv := makeTestPV(specVolumeName, 10, testDriver, testVol) + spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) + + // save volume data + dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) { + t.Errorf("failed to create dir [%s]: %v", dir, err) + } + + if err := saveVolumeData( + dir, + volDataFileName, + map[string]string{ + volDataKey.specVolID: pv.ObjectMeta.Name, + volDataKey.driverName: testDriver, + volDataKey.volHandle: testVol, + }, + ); err != nil { + t.Fatalf("failed to save volume data: %v", err) + } + + unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) + if err != nil { + t.Fatalf("failed to make a new Unmapper: %v", err) + } + + csiUnmapper := unmapper.(*csiBlockMapper) + csiUnmapper.csiClient = setupClient(t, true) + + return csiUnmapper, spec, pv, nil +} + func TestBlockMapperGetGlobalMapPath(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() @@ -254,18 +288,6 @@ func TestBlockMapperSetupDevice(t *testing.T) { if svol.Path != stagingPath { t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) } - - // Check if NodePublishVolume published to the right path - pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() - pvol, ok := pvols[csiMapper.volumeID] - if !ok { - t.Error("csi server may not have received NodePublishVolume call") - } - - publishPath := csiMapper.getPublishPath() - if pvol.Path != publishPath { - t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) - } } func TestBlockMapperMapPodDevice(t *testing.T) { @@ -307,9 +329,20 @@ func TestBlockMapperMapPodDevice(t *testing.T) { if err != nil { t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) } + + // Check if NodePublishVolume published to the right path + pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() + pvol, ok := pvols[csiMapper.volumeID] + if !ok { + t.Error("csi server may not have received NodePublishVolume call") + } + publishPath := csiMapper.getPublishPath() + if pvol.Path != publishPath { + t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) + } if path != publishPath { - t.Errorf("path %s and %s doesn't match", path, publishPath) + t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path) } } diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 1f287e42a04..19072963da0 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -120,11 +120,20 @@ func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume { return f.nodePublishedVolumes } +// AddNodePublishedVolume adds specified volume to nodePublishedVolumes +func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) { + f.nodePublishedVolumes[volID] = CSIVolume{ + Path: deviceMountPath, + VolumeContext: volumeContext, + } +} + // GetNodeStagedVolumes returns node staged volumes func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume { return f.nodeStagedVolumes } +// AddNodeStagedVolume adds specified volume to nodeStagedVolumes func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) { f.nodeStagedVolumes[volID] = CSIVolume{ Path: deviceMountPath,