Separate staging/publish and unstaging/unpublish logics for block

This commit is contained in:
Masaki Kimura 2019-11-01 01:25:00 +00:00
parent 7caf731773
commit 4578c6c8ce
3 changed files with 187 additions and 41 deletions

View File

@ -14,6 +14,55 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
/*
This file defines block volume related methods for CSI driver.
CSI driver is responsible for staging/publishing volumes to their staging/publish paths.
Mapping and unmapping of a device in a publish path to its global map path and its
pod device map path are done by operation_executor through MapBlockVolume/UnmapBlockVolume
(MapBlockVolume and UnmapBlockVolume take care for lock, symlink, and bind mount).
Summary of block volume related CSI driver's methods are as follows:
- GetGlobalMapPath returns a global map path,
- GetPodDeviceMapPath returns a pod device map path and filename,
- SetUpDevice calls CSI's NodeStageVolume and stage a volume to its staging path,
- MapPodDevice calls CSI's NodePublishVolume and publish a volume to its publish path,
- UnmapPodDevice calls CSI's NodeUnpublishVolume and unpublish a volume from its publish path,
- TearDownDevice calls CSI's NodeUnstageVolume and unstage a volume from its staging path.
These methods are called by below sequences:
- operation_executor.MountVolume
- csi.GetGlobalMapPath
- csi.SetupDevice
- NodeStageVolume
- ASW.MarkDeviceAsMounted
- csi.GetPodDeviceMapPath
- csi.MapPodDevice
- NodePublishVolume
- util.MapBlockVolume
- ASW.MarkVolumeAsMounted
- operation_executor.UnmountVolume
- csi.GetPodDeviceMapPath
- util.UnmapBlockVolume
- csi.UnmapPodDevice
- NodeUnpublishVolume
- ASW.MarkVolumeAsUnmounted
- operation_executor.UnmountDevice
- csi.TearDownDevice
- NodeUnstageVolume
- ASW.MarkDeviceAsUnmounted
After successful MountVolume for block volume, directory structure will be like below:
/dev/loopX ... Descriptor lock(Loopback device to mapFile under global map path)
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/ ... Global map path
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/{podUID} ... MapFile(Bind mount to publish Path)
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging/{specName} ... Staging path
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} ... Publish path
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/ ... Pod device map path
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/{specName} ... MapFile(Symlink to publish path)
*/
package csi
import (
@ -71,7 +120,7 @@ func (m *csiBlockMapper) getPublishPath() string {
}
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {specName}
// returns: pods/{podUID}/volumeDevices/kubernetes.io~csi, {specName}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName))
@ -147,7 +196,6 @@ func (m *csiBlockMapper) publishVolumeForBlock(
accessMode v1.PersistentVolumeAccessMode,
csiSource *v1.CSIPersistentVolumeSource,
attachment *storage.VolumeAttachment,
stagingPath string,
) (string, error) {
klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
@ -183,7 +231,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
ctx,
m.volumeID,
m.readOnly,
stagingPath,
m.getStagingPath(),
publishPath,
accessMode,
publishVolumeInfo,
@ -249,13 +297,7 @@ func (m *csiBlockMapper) SetUpDevice() error {
}
// Call NodeStageVolume
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return err
}
// Call NodePublishVolume
_, err = m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return err
}
@ -264,7 +306,59 @@ func (m *csiBlockMapper) SetUpDevice() error {
}
func (m *csiBlockMapper) MapPodDevice() (string, error) {
return m.getPublishPath(), nil
if !m.plugin.blockEnabled {
return "", errors.New("CSIBlockVolume feature not enabled")
}
klog.V(4).Infof(log("blockMapper.MapPodDevice called"))
// Get csiSource from spec
if m.spec == nil {
return "", errors.New(log("blockMapper.MapPodDevice spec is nil"))
}
csiSource, err := getCSISourceFromSpec(m.spec)
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err))
}
driverName := csiSource.Driver
skip, err := m.plugin.skipAttach(driverName)
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err))
}
var attachment *storage.VolumeAttachment
if !skip {
// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
nodeName := string(m.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err))
}
}
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if m.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
csiClient, err := m.csiClientGetter.Get()
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
}
// Call NodePublishVolume
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return "", err
}
return publishPath, nil
}
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
@ -319,8 +413,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New("CSIBlockVolume feature not enabled")
}
klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
@ -329,21 +421,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
}
// Call NodeUnpublishVolume
publishPath := m.getPublishPath()
if _, err := os.Stat(publishPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
}
// Call NodeUnstageVolume
stagingPath := m.getStagingPath()
if _, err := os.Stat(stagingPath); err != nil {
@ -364,5 +441,32 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
// UnmapPodDevice unmaps the block device path.
func (m *csiBlockMapper) UnmapPodDevice() error {
if !m.plugin.blockEnabled {
return errors.New("CSIBlockVolume feature not enabled")
}
publishPath := m.getPublishPath()
csiClient, err := m.csiClientGetter.Get()
if err != nil {
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Call NodeUnpublishVolume
if _, err := os.Stat(publishPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
}
return nil
}

View File

@ -49,6 +49,40 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T
return csiMapper, spec, pv, nil
}
func prepareBlockUnmapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV(specVolumeName, 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
// save volume data
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
t.Errorf("failed to create dir [%s]: %v", dir, err)
}
if err := saveVolumeData(
dir,
volDataFileName,
map[string]string{
volDataKey.specVolID: pv.ObjectMeta.Name,
volDataKey.driverName: testDriver,
volDataKey.volHandle: testVol,
},
); err != nil {
t.Fatalf("failed to save volume data: %v", err)
}
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
if err != nil {
t.Fatalf("failed to make a new Unmapper: %v", err)
}
csiUnmapper := unmapper.(*csiBlockMapper)
csiUnmapper.csiClient = setupClient(t, true)
return csiUnmapper, spec, pv, nil
}
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
@ -254,18 +288,6 @@ func TestBlockMapperSetupDevice(t *testing.T) {
if svol.Path != stagingPath {
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
}
// Check if NodePublishVolume published to the right path
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
pvol, ok := pvols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
publishPath := csiMapper.getPublishPath()
if pvol.Path != publishPath {
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
}
}
func TestBlockMapperMapPodDevice(t *testing.T) {
@ -307,9 +329,20 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
if err != nil {
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
}
// Check if NodePublishVolume published to the right path
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
pvol, ok := pvols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}
publishPath := csiMapper.getPublishPath()
if pvol.Path != publishPath {
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
}
if path != publishPath {
t.Errorf("path %s and %s doesn't match", path, publishPath)
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
}
}

View File

@ -120,11 +120,20 @@ func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
return f.nodePublishedVolumes
}
// AddNodePublishedVolume adds specified volume to nodePublishedVolumes
func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
f.nodePublishedVolumes[volID] = CSIVolume{
Path: deviceMountPath,
VolumeContext: volumeContext,
}
}
// GetNodeStagedVolumes returns node staged volumes
func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
return f.nodeStagedVolumes
}
// AddNodeStagedVolume adds specified volume to nodeStagedVolumes
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
f.nodeStagedVolumes[volID] = CSIVolume{
Path: deviceMountPath,