zfssa-csi-driver/pkg/service/node.go
2024-07-02 09:11:51 -06:00

273 lines
9.1 KiB
Go

/*
* Copyright (c) 2021, 2024, Oracle.
* Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
*/
package service
import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/oracle/zfssa-csi-driver/pkg/utils"
"github.com/oracle/zfssa-csi-driver/pkg/zfssarest"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"os"
)
var (
// nodeCaps represents the capability of node service.
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
// csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_UNKNOWN,
}
)
func NewZFSSANodeServer(zd *ZFSSADriver) *csi.NodeServer {
zd.NodeMounter = newNodeMounter()
var ns csi.NodeServer = zd
return &ns
}
func (zd *ZFSSADriver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (
*csi.NodeStageVolumeResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeStageVolume", "request", req)
// The request validity of the request is checked
VolumeID := req.GetVolumeId()
if len(VolumeID) == 0 {
utils.GetLogNODE(ctx, 2).Println("VolumeID not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
targetPath := req.GetStagingTargetPath()
if len(targetPath) == 0 {
utils.GetLogNODE(ctx, 2).Println("Target path not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
reqCaps := req.GetVolumeCapability()
if reqCaps == nil {
utils.GetLogNODE(ctx, 2).Println("Capability not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Capability not provided")
}
// Not staging for either block or mount for now.
return &csi.NodeStageVolumeResponse{}, nil
}
func (zd *ZFSSADriver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (
*csi.NodeUnstageVolumeResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeUnStageVolume", "request", req)
VolumeID := req.GetVolumeId()
if len(VolumeID) == 0 {
utils.GetLogNODE(ctx, 2).Println("VolumeID not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
target := req.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}
// Check if target directory is a mount point. GetDeviceNameFromMount
// given a mnt point, finds the device from /proc/mounts
// returns the device name, reference count, and error code
dev, refCount, err := zd.NodeMounter.GetDeviceName(target)
if err != nil {
msg := fmt.Sprintf("failed to check if volume is mounted: %v", err)
return nil, status.Error(codes.Internal, msg)
}
// From the spec: If the volume corresponding to the volume_id
// is not staged to the staging_target_path, the Plugin MUST
// reply 0 OK.
if refCount == 0 {
utils.GetLogNODE(ctx, 3).Println("NodeUnstageVolume: target not mounted", "target", target)
return &csi.NodeUnstageVolumeResponse{}, nil
}
if refCount > 1 {
utils.GetLogNODE(ctx, 2).Println("NodeUnstageVolume: found references to device mounted at target path",
"references", refCount, "device", dev, "target", target)
}
utils.GetLogNODE(ctx, 5).Println("NodeUnstageVolume: unmounting target", "target", target)
err = zd.NodeMounter.Unmount(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Cannot unmount staging target %q: %v", target, err)
}
notMnt, mntErr := zd.NodeMounter.IsLikelyNotMountPoint(target)
if mntErr != nil {
utils.GetLogNODE(ctx, 2).Println("Cannot determine staging target path",
"staging_target_path", target, "error", err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if notMnt {
if err := os.Remove(target); err != nil {
utils.GetLogNODE(ctx, 2).Println("Cannot delete staging target path",
"staging_target_path", target, "error", err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (zd *ZFSSADriver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (
*csi.NodePublishVolumeResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodePublishVolume", "request", req)
VolumeID := req.GetVolumeId()
if len(VolumeID) == 0 {
utils.GetLogNODE(ctx, 2).Println("VolumeID not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
zVolumeId, err := utils.VolumeIdFromString(VolumeID)
if err != nil {
utils.GetLogNODE(ctx, 2).Println("NodePublishVolume Volume ID was invalid",
"volume_id", req.GetVolumeId(), "error", err.Error())
// NOTE: by spec, we should return success since there is nothing to delete
return nil, status.Error(codes.InvalidArgument, "Volume ID invalid")
}
source := req.GetStagingTargetPath()
if len(source) == 0 {
utils.GetLogNODE(ctx, 2).Println("Staging target path not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}
target := req.GetTargetPath()
if len(target) == 0 {
utils.GetLogNODE(ctx, 2).Println("Target path not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
utils.GetLogNODE(ctx, 2).Printf("NodePublishVolume: stagingTarget=%s, target=%s", source, target)
volCap := req.GetVolumeCapability()
if volCap == nil {
utils.GetLogNODE(ctx, 2).Println("Volume Capabilities path not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
// The account to be used for this operation is determined.
user, password, err := zd.getUserLogin(ctx, req.Secrets)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "Invalid credentials")
}
token := zfssarest.LookUpToken(ctx, user, password)
var mountOptions []string
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}
if req.GetVolumeCapability().GetBlock() != nil {
mountOptions = append(mountOptions, "bind")
return zd.nodePublishBlockVolume(ctx, token, req, zVolumeId, mountOptions)
}
switch mode := volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
mountOptions = append(mountOptions, "bind")
return zd.nodePublishBlockVolume(ctx, token, req, zVolumeId, mountOptions)
case *csi.VolumeCapability_Mount:
return zd.nodePublishFileSystem(ctx, token, req, zVolumeId, mountOptions, mode)
default:
utils.GetLogNODE(ctx, 2).Println("Publish does not support Access Type", "access_type",
volCap.GetAccessType())
return nil, status.Error(codes.InvalidArgument, "Invalid access type")
}
}
func (zd *ZFSSADriver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (
*csi.NodeUnpublishVolumeResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeUnpublishVolume", "request", req)
targetPath := req.GetTargetPath()
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
utils.GetLogNODE(ctx, 2).Println("VolumeID not provided, will return")
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
zVolumeId, err := utils.VolumeIdFromString(volumeID)
if err != nil {
utils.GetLogNODE(ctx, 2).Println("Cannot unpublish volume",
"volume_id", req.GetVolumeId(), "error", err.Error())
return nil, err
}
user, password, err := zd.getUserLogin(ctx, nil)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "Invalid credentials")
}
token := zfssarest.LookUpToken(ctx, user, password)
if zVolumeId.IsBlock() {
return zd.nodeUnpublishBlockVolume(ctx, token, req, zVolumeId)
} else {
return zd.nodeUnpublishFilesystemVolume(token, ctx, req, zVolumeId)
}
}
func (zd *ZFSSADriver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (
*csi.NodeGetVolumeStatsResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeGetVolumeStats", "request", req)
return nil, status.Error(codes.Unimplemented, "")
}
func (zd *ZFSSADriver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (
*csi.NodeExpandVolumeResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeExpandVolume", "request", req)
return nil, status.Error(codes.Unimplemented, "")
}
func (zd *ZFSSADriver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (
*csi.NodeGetCapabilitiesResponse, error) {
utils.GetLogNODE(ctx, 5).Println("NodeGetCapabilities", "request", req)
var caps []*csi.NodeServiceCapability
for _, capacity := range nodeCaps {
c := &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: capacity,
},
},
}
caps = append(caps, c)
}
return &csi.NodeGetCapabilitiesResponse{Capabilities: caps}, nil
}
func (zd *ZFSSADriver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (
*csi.NodeGetInfoResponse, error) {
utils.GetLogNODE(ctx, 2).Println("NodeGetInfo", "request", req)
return &csi.NodeGetInfoResponse{
NodeId: zd.config.NodeName,
}, nil
}