init csi-driver-nvmf

Signed-off-by: zhouenhua <zhouenhua@bytedance.com>
This commit is contained in:
zhouenhua
2021-09-10 11:50:07 +08:00
commit 6a6ac61e01
31 changed files with 2544 additions and 0 deletions

31
pkg/nvmf/const.go Normal file
View File

@@ -0,0 +1,31 @@
package nvmf
const (
NVMF_NQN_SIZE = 223
SYS_NVMF = "/sys/class/nvmf"
)
// Here erron
const (
ENOENT = 1 /* No such file or directory */
EINVAL = 2 /* Invalid argument */
)
const (
DefaultDriverName = "csi.nvmf.com"
DefaultDriverServicePort = "12230"
DefaultDriverVersion = "v1.0.0"
DefaultVolumeMapPath = "/var/lib/nvmf/volumes"
)
type GlobalConfig struct {
NVMfVolumeMapDir string
DriverName string
Region string
NodeID string
Endpoint string // CSI endpoint
Version string
IsControllerServer bool
LogLevel string
}

View File

@@ -0,0 +1,78 @@
package nvmf
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
type ControllerServer struct {
Driver *driver
}
// create controller server
func NewControllerServer(d *driver) *ControllerServer {
return &ControllerServer{
Driver: d,
}
}
// You should realize your volume provider here, such as requesting the Cloud to create an NVMf block and
// returning specific information to you
func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented,"CreateVolume should implement by yourself. ")
}
func (c *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "DeleteVolume should implement by yourself. ")
}
func (c *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerExpandVolume should implement by yourself")
}
func (c *ControllerServer) ControllerGetVolume(ctx context.Context, request *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerGetVolume not implement")
}
func (c *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerPublishVolume not implement")
}
func (c *ControllerServer) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerUnpublishVolume not implement")
}
func (c *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ValidateVolumeCapabilities not implement")
}
func (c *ControllerServer) ListVolumes(ctx context.Context, request *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ListVolumes not implement")
}
func (c *ControllerServer) GetCapacity(ctx context.Context, request *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "GetCapacity not implement")
}
func (c *ControllerServer) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
klog.Infof("Using default ControllerGetCapabilities")
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: c.Driver.cscap,
}, nil
}
func (c *ControllerServer) CreateSnapshot(ctx context.Context, request *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "CreateSnapshot not implement")
}
func (c *ControllerServer) DeleteSnapshot(ctx context.Context, request *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "DeleteSnapshot not implement")
}
func (c *ControllerServer) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ListSnapshots not implement")
}

39
pkg/nvmf/disk.go Normal file
View File

@@ -0,0 +1,39 @@
package nvmf
import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
)
type nvmfDiskInfo struct {
VolId string
Nqn string
Addr string
Port string
DeviceUUID string
Transport string
}
func getNVMfInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) {
volId := req.GetVolumeId()
volOpts := req.GetVolumeContext()
targetTrAddr := volOpts["targetTrAddr"]
targetTrPort := volOpts["targetTrPort"]
targetTrType := volOpts["targetTrType"]
deviceUUID := volOpts["deviceUUID"]
nqn := volOpts["nqn"]
if targetTrAddr == "" || nqn == "" || targetTrPort == "" || targetTrType == "" {
return nil, fmt.Errorf("Some Nvme target info is missing, volID: %s ", volId)
}
return &nvmfDiskInfo{
VolId: volId,
Addr: targetTrAddr,
Port: targetTrPort,
Nqn: nqn,
DeviceUUID: deviceUUID,
Transport: targetTrType,
}, nil
}

106
pkg/nvmf/driver.go Normal file
View File

@@ -0,0 +1,106 @@
package nvmf
import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
type driver struct {
name string
nodeId string
version string
region string
volumeMapDir string
idServer *IdentityServer
nodeServer *NodeServer
controllerServer *ControllerServer
cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
}
// NewDriver create the identity/node
func NewDriver(conf *GlobalConfig) *driver {
if conf.DriverName == "" {
klog.Fatalf("driverName not been specified")
return nil
}
klog.Infof("Driver: %v version: %v", conf.DriverName, conf.Version)
return &driver{
name: conf.DriverName,
version: conf.Version,
nodeId: conf.NodeID,
region: conf.Region,
volumeMapDir: conf.NVMfVolumeMapDir,
}
}
func (d *driver) Run(conf *GlobalConfig) {
d.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
})
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
})
d.idServer = NewIdentityServer(d)
d.nodeServer = NewNodeServer(d)
if conf.IsControllerServer {
d.controllerServer = NewControllerServer(d)
}
klog.Infof("Starting csi-plugin Driver: %v", d.name)
s := NewNonBlockingGRPCServer()
s.Start(conf.Endpoint, d.idServer, d.controllerServer, d.nodeServer)
s.Wait()
}
func (d *driver) AddVolumeCapabilityAccessModes(caps []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
var cap []*csi.VolumeCapability_AccessMode
for _, c := range caps {
klog.Infof("Enabling volume access mode: %v", c.String())
cap = append(cap, &csi.VolumeCapability_AccessMode{Mode: c})
}
d.cap = cap
return cap
}
func (d *driver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
var csc []*csi.ControllerServiceCapability
for _, c := range cl {
klog.Infof("Enabling controller service capability: %v", c.String())
csc = append(csc, &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: c,
},
},
})
}
d.cscap = csc
return
}
func (d *driver) ValidateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error {
if c == csi.ControllerServiceCapability_RPC_UNKNOWN {
return nil
}
for _, cap := range d.cscap {
if c == cap.GetRpc().GetType() {
return nil
}
}
return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c))
}

211
pkg/nvmf/fabrics.go Normal file
View File

@@ -0,0 +1,211 @@
package nvmf
import (
"csi-driver-nvmf/pkg/utils"
"encoding/json"
"fmt"
"io/ioutil"
"k8s.io/klog"
"os"
"strings"
)
func getConnector(nvmfInfo *nvmfDiskInfo) *Connector {
return &Connector{
VolumeID: nvmfInfo.VolId,
DeviceUUID: nvmfInfo.DeviceUUID,
TargetNqn: nvmfInfo.Nqn,
TargetAddr: nvmfInfo.Addr,
TargetPort: nvmfInfo.Port,
Transport: nvmfInfo.Transport,
}
}
// connector provides a struct to hold all of the needed parameters to make nvmf connection
type Connector struct {
VolumeID string
DeviceUUID string
TargetNqn string
TargetAddr string
TargetPort string
Transport string
RetryCount int32
CheckInterval int32
}
func _connect(argStr string) error {
file, err := os.OpenFile("/dev/nvmf-fabrics", os.O_RDWR, 0666)
if err != nil {
klog.Errorf("Connect: open NVMf fabrics error: %v", err)
return err
}
defer file.Close()
err = utils.WriteStringToFile(file, argStr)
if err != nil {
klog.Errorf("Connect: write arg to connect file error: %v", err)
return err
}
// todo: read file to verify
lines, err := utils.ReadLinesFromFile(file)
klog.Infof("Connect: read string %s", lines)
return nil
}
func _disconnect(sysfs_path string) error {
file, err := os.OpenFile(sysfs_path, os.O_WRONLY, 0755)
if err != nil {
return err
}
err = utils.WriteStringToFile(file, "1")
if err != nil {
klog.Errorf("Disconnect: write 1 to delete_controller error: %v", err)
return err
}
return nil
}
func disconnectSubsys(nqn, ctrl string) (res bool) {
sysfs_nqn_path := fmt.Sprintf("%s/%s/subsysnqn", SYS_NVMF, ctrl)
sysfs_del_path := fmt.Sprintf("%s/%s/delete_controller", SYS_NVMF, ctrl)
file, err := os.Open(sysfs_nqn_path)
if err != nil {
klog.Errorf("Disconnect: open file %s err: %v", file.Name(), err)
return false
}
defer file.Close()
lines, err := utils.ReadLinesFromFile(file)
if err != nil {
klog.Errorf("Disconnect: read file %s err: %v", file.Name(), err)
return false
}
if lines[0] != nqn {
klog.Warningf("Disconnect: not this subsystem, skip")
return false
}
err = _disconnect(sysfs_del_path)
if err != nil {
klog.Errorf("Disconnect: disconnect error: %s", err)
return false
}
return true
}
func disconnectByNqn(nqn string) int {
ret := 0
if len(nqn) > NVMF_NQN_SIZE {
klog.Errorf("Disconnect: nqn %s is too long ", nqn)
return -EINVAL
}
devices, err := ioutil.ReadDir(SYS_NVMF)
if err != nil {
klog.Errorf("Disconnect: readdir %s err: %s", SYS_NVMF, err)
return -ENOENT
}
for _, device := range devices {
if disconnectSubsys(nqn, device.Name()) {
ret++
}
}
return ret
}
// connect to volume to this node and return devicePath
func Connect(c *Connector) (string, error) {
if c.RetryCount == 0 {
c.RetryCount = 10
}
if c.CheckInterval == 0 {
c.CheckInterval = 1
}
if c.RetryCount < 0 || c.CheckInterval < 0 {
return "", fmt.Errorf("Invalid RetryCount and CheckInterval combinaiton "+
"RetryCount: %d, CheckInterval: %d ", c.RetryCount, c.CheckInterval)
}
if strings.ToLower(c.Transport) != "tcp" && strings.ToLower(c.Transport) != "rdma" {
return "", fmt.Errorf("csi transport only support tcp/rdma ")
}
baseString := fmt.Sprintf("nqn=%s,transport=%s,traddr=%s,trsvcid=%s", c.TargetNqn, c.Transport, c.TargetAddr, c.TargetPort)
devicePath := strings.Join([]string{"/dev/disk/by-id/nvmf-uuid", c.DeviceUUID}, ".")
// connect to nvmf disk
err := _connect(baseString)
if err != nil {
return "", err
}
klog.Infof("Connect Volume %s success nqn: %s", c.VolumeID, c.TargetNqn)
retries := int(c.RetryCount / c.CheckInterval)
if exists, err := waitForPathToExist(devicePath, retries, int(c.CheckInterval), c.Transport); !exists {
klog.Errorf("connect nqn %s error %v, rollback", c.TargetNqn, err)
ret := disconnectByNqn(c.TargetNqn)
if ret < 0 {
klog.Errorf("rollback error !!!")
}
return "", err
}
klog.Infof("After connect we're returning devicePath: %s", devicePath)
return devicePath, nil
}
// we disconnect only by nqn
func Disconnect(c *Connector) error {
ret := disconnectByNqn(c.TargetNqn)
if ret == 0 {
return fmt.Errorf("Disconnect: failed to disconnect by nqn: %s ", c.TargetNqn)
}
return nil
}
// PersistConnector persists the provided Connector to the specified file (ie /var/lib/pfile/myConnector.json)
func persistConnector(c *Connector, filePath string) error {
//file := path.Join("mnt", c.VolumeName+".json")
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating nvmf persistence file %s: %s", filePath, err)
}
defer f.Close()
encoder := json.NewEncoder(f)
if err = encoder.Encode(c); err != nil {
return fmt.Errorf("error encoding connector: %v", err)
}
return nil
}
func removeConnector(targetPath string) {
// todo: here maybe be attack for os.Remove can operate any file, fix?
if err := os.Remove(targetPath + ".json"); err != nil {
klog.Errorf("DetachDisk: Can't remove connector file: %s", targetPath)
}
if err := os.RemoveAll(targetPath); err != nil {
klog.Errorf("DetachDisk: failed to remove mount path Error: %v", err)
}
}
func GetConnectorFromFile(filePath string) (*Connector, error) {
f, err := ioutil.ReadFile(filePath)
if err != nil {
return &Connector{}, err
}
data := Connector{}
err = json.Unmarshal([]byte(f), &data)
if err != nil {
return &Connector{}, err
}
return &data, nil
}

View File

@@ -0,0 +1,65 @@
package nvmf
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
type IdentityServer struct {
Driver *driver
}
// Create IdentityServer
func NewIdentityServer(d *driver) *IdentityServer {
return &IdentityServer{
Driver: d,
}
}
func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
klog.V(5).Infof("Using default GetPluginInfo")
if ids.Driver.name == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if ids.Driver.version == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: ids.Driver.name,
VendorVersion: ids.Driver.version,
}, nil
}
func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
klog.V(5).Infof("Identity: getPluginCapabilities ")
resp := &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_VolumeExpansion_{
VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
Type: csi.PluginCapability_VolumeExpansion_OFFLINE,
},
},
},
},
}
return resp, nil
}

47
pkg/nvmf/mounter.go Normal file
View File

@@ -0,0 +1,47 @@
package nvmf
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/utils/exec"
"k8s.io/utils/mount"
)
type nvmfDiskMounter struct {
*nvmfDiskInfo
readOnly bool
fsType string
mountOptions []string
mounter *mount.SafeFormatAndMount
exec exec.Interface
targetPath string
connector *Connector
}
type nvmfDiskUnMounter struct {
mounter mount.Interface
exec exec.Interface
}
func getNVMfDiskMounter(nvmfInfo *nvmfDiskInfo, req *csi.NodePublishVolumeRequest) *nvmfDiskMounter {
readOnly := req.GetReadonly()
fsType := req.GetVolumeCapability().GetMount().GetFsType()
mountOptions := req.GetVolumeCapability().GetMount().GetMountFlags()
return &nvmfDiskMounter{
nvmfDiskInfo : nvmfInfo,
readOnly: readOnly,
fsType: fsType,
mountOptions: mountOptions,
mounter: &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: exec.New()},
exec: exec.New(),
targetPath: req.GetTargetPath(),
connector: getConnector(nvmfInfo),
}
}
func getNVMfDiskUnMounter(req *csi.NodeUnpublishVolumeRequest) *nvmfDiskUnMounter {
return &nvmfDiskUnMounter{
mounter: mount.New(""),
exec: exec.New(),
}
}

128
pkg/nvmf/nodeserver.go Normal file
View File

@@ -0,0 +1,128 @@
package nvmf
import (
"csi-driver-nvmf/pkg/utils"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
"os"
)
type NodeServer struct {
Driver *driver
}
func NewNodeServer(d *driver) *NodeServer {
return &NodeServer{
Driver: d,
}
}
func (n *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
klog.Infof("Using Nvme NodeGetCapabilities")
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
},
}, nil
}
func (n *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
// 1. check parameters
targetPath := req.TargetPath
if req.VolumeId == "" {
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume VolumeID must be provided")
}
if targetPath == "" {
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume Target Path must be provided")
}
if req.VolumeCapability == nil {
return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
}
// 2. attachdisk
nvmfInfo, err := getNVMfInfo(req)
if err != nil {
return nil, status.Errorf(codes.Internal, "NodePublishVolume: get NVMf disk info from req err: %v", err)
}
diskMounter := getNVMfDiskMounter(nvmfInfo, req)
// attachDisk realize connect NVMf disk and mount to docker path
_, err = AttachDisk(req, *diskMounter)
if err != nil {
klog.Errorf("NodePublishVolume: Attach volume %s with error: %s", req.VolumeId, err.Error())
return nil, err
}
return &csi.NodePublishVolumeResponse{}, nil
}
func (n *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.Infof("NodeUnpublishVolume: Starting unpublish volume, %s, %v", req.VolumeId, req)
if req.VolumeId == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume VolumeID must be provided")
}
if req.TargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Staging TargetPath must be provided")
}
targetPath := req.GetTargetPath()
err := DetachDisk(req.VolumeId, getNVMfDiskUnMounter(req), targetPath)
if err != nil {
klog.Errorf("NodeUnpublishVolume: VolumeID: %s detachDisk err: %v", req.VolumeId, err)
return nil, err
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (n *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return &csi.NodeStageVolumeResponse{}, nil
}
func (n *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (n *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
deviceName, err := GetDeviceNameByVolumeID(req.VolumeId)
if err != nil {
klog.Errorf("NodeExpandVolume: Get Device by volumeID: %s error %v", req.VolumeId, err)
return nil, status.Errorf(codes.Internal, "NodeExpandVolume: Get Device by volumeID: %s error %v", req.VolumeId, err)
}
scanPath := parseDeviceToControllerPath(deviceName)
if utils.IsFileExisting(scanPath) {
file, err := os.OpenFile(scanPath, os.O_RDWR|os.O_TRUNC, 0766)
err = utils.WriteStringToFile(file, "1")
if err != nil {
klog.Errorf("NodeExpandVolume: Rescan error: %v", err)
return nil, status.Errorf(codes.Internal, "NodeExpandVolume: Rescan error: %v", err)
}
} else {
return nil, status.Errorf(codes.Internal, "NodeExpandVolume: rescan path %s not exist", scanPath)
}
return &csi.NodeExpandVolumeResponse{}, nil
}
func (n *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: n.Driver.nodeId,
}, nil
}
func (n *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "NodeGetVolumeStats not implement")
}

95
pkg/nvmf/nvmf.go Normal file
View File

@@ -0,0 +1,95 @@
package nvmf
import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog"
"k8s.io/utils/mount"
"os"
)
func AttachDisk(req *csi.NodePublishVolumeRequest, nm nvmfDiskMounter) (string, error) {
devicePath, err := Connect(nm.connector)
if err != nil {
klog.Errorf("AttachDisk: VolumeID %s failed to connect, Error: %v", req.VolumeId, err)
return "", err
}
klog.Infof("AttachDisk: Volume %s successful connected, Device%s", req.VolumeId, devicePath)
mntPath := nm.targetPath
klog.Infof("AttachDisk: MntPath: %s", mntPath)
notMounted, err := nm.mounter.IsLikelyNotMountPoint(mntPath)
if err != nil && !os.IsNotExist(err) {
return "", fmt.Errorf("Heuristic determination of mount point failed:%v", err)
}
if !notMounted {
klog.Infof("AttachDisk: VolumeID: %s, Path: %s is already mounted, device: %s", req.VolumeId, nm.targetPath)
return "", nil
}
if err := os.MkdirAll(mntPath, 0750); err != nil {
klog.Errorf("AttachDisk: failed to mkdir %s, error", mntPath)
return "", err
}
err = persistConnector(nm.connector, mntPath+".json")
if err != nil {
klog.Errorf("AttachDisk: failed to persist connection info: %v", err)
klog.Errorf("AttachDisk: disconnecting volume and failing the publish request because persistence file are required for unpublish volume")
return "", fmt.Errorf("unable to create persistence file for connection")
}
// Tips: use k8s mounter to mount fs and only support "ext4"
var options []string
if nm.readOnly {
options = append(options, "ro")
} else {
options = append(options, "rw")
}
options = append(options, nm.mountOptions...)
err = nm.mounter.FormatAndMount(devicePath, mntPath, nm.fsType, options)
if err != nil {
klog.Errorf("AttachDisk: failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
Disconnect(nm.connector)
removeConnector(mntPath)
return "", fmt.Errorf("failed to mount Device %s to %s with options: %v", devicePath, mntPath, options)
}
klog.Infof("AttachDisk: Successfully Mount Device %s to %s with options: %v", devicePath, mntPath, options)
return devicePath, nil
}
func DetachDisk(volumeID string, num *nvmfDiskUnMounter, targetPath string) error {
_, cnt, err := mount.GetDeviceNameFromMount(num.mounter, targetPath)
if err != nil {
klog.Errorf("nvmf detach disk: failed to get device from mnt: %s\nError: %v", targetPath, err)
return err
}
if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
klog.Warningf("Warning: Unmount skipped because path does not exist: %v", targetPath)
return nil
}
if err = num.mounter.Unmount(targetPath); err != nil {
klog.Errorf("iscsi detach disk: failed to unmount: %s\nError: %v", targetPath, err)
return err
}
cnt--
if cnt != 0 {
return nil
}
connector, err := GetConnectorFromFile(targetPath + ".json")
if err != nil {
klog.Errorf("DetachDisk: failed to get connector from path %s Error: %v", targetPath, err)
return err
}
err = Disconnect(connector)
if err != nil {
klog.Errorf("DetachDisk: VolumeID: %s failed to disconnect, Error: %v", volumeID, err)
return err
}
removeConnector(targetPath)
return nil
}

84
pkg/nvmf/nvmf_utils.go Normal file
View File

@@ -0,0 +1,84 @@
package nvmf
import (
"csi-driver-nvmf/pkg/utils"
"context"
"fmt"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/klog"
"os"
"path/filepath"
"strings"
"time"
)
func waitForPathToExist(devicePath string, maxRetries, intervalSeconds int, deviceTransport string) (bool, error) {
var err error
for i := 0; i < maxRetries; i++ {
err = nil
if deviceTransport == "tcp" {
exist := utils.IsFileExisting(devicePath)
if exist {
return true, nil
}
} else {
return false, fmt.Errorf("connect only support tcp")
}
if i == maxRetries-1 {
break
}
time.Sleep(time.Second * time.Duration(intervalSeconds))
}
return false, err
}
func GetDeviceNameByVolumeID(volumeID string) (deviceName string, err error) {
volumeLinkPath := strings.Join([]string{"/dev/disk/by-id/nvmf-uuid", volumeID}, ".")
stat, err := os.Lstat(volumeLinkPath)
if err != nil {
if os.IsNotExist(err) {
return "", fmt.Errorf("volumeID link path %q not found", volumeLinkPath)
} else {
return "", fmt.Errorf("error getting stat of %q: %v", volumeLinkPath, err)
}
}
if stat.Mode()&os.ModeSymlink != os.ModeSymlink {
klog.Errorf("volumeID link file %q found, but was not a symlink", volumeLinkPath)
return "", fmt.Errorf("volumeID link file %q found, but was not a symlink", volumeLinkPath)
}
resolved, err := filepath.EvalSymlinks(volumeLinkPath)
if err != nil {
return "", fmt.Errorf("error reading target of symlink %q: %v", volumeLinkPath, err)
}
if !strings.HasPrefix(resolved, "/dev") {
return "", fmt.Errorf("resolved symlink for %q was unexpected: %q", volumeLinkPath, resolved)
}
log.Infof("Device Link Info: %s link to %s", volumeLinkPath, resolved)
tmp := strings.Split(resolved, "/")
return tmp[len(tmp)-1], nil
}
func parseDeviceToControllerPath(deviceName string) string {
nvmfControllerPrefix := "/sys/class/block"
index := strings.LastIndex(deviceName, "n")
parsed := deviceName[:index] + "c0" + deviceName[index:]
scanPath := filepath.Join(nvmfControllerPrefix, parsed, "device/rescan_controller")
return scanPath
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
klog.Infof("GRPC call: %s", info.FullMethod)
klog.Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
} else {
klog.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
return resp, err
}

94
pkg/nvmf/server.go Normal file
View File

@@ -0,0 +1,94 @@
package nvmf
import (
"csi-driver-nvmf/pkg/utils"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/klog"
"net"
"os"
"sync"
)
// Defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
return
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := utils.ParseEndpoint(endpoint)
if err != nil {
klog.Fatal(err.Error())
}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
klog.Infof("Listening for connections on address: %#v", listener.Addr())
err = server.Serve(listener)
if err != nil {
klog.Fatalf("Failed to server: %v", err)
}
}