feature: support request back_end controller to Create/Delete Volume

1. send a http request to Create/Delete Volume
2. a most based code.

Signed-off-by: Meinhard Zhou <zhouenhua@bytedance.com>
This commit is contained in:
Meinhard Zhou 2022-06-13 14:35:22 +08:00
parent 3c4f0d80d2
commit 51b762d9ca
9 changed files with 379 additions and 52 deletions

View File

@ -38,7 +38,9 @@ func init() {
flag.StringVar(&conf.DriverName, "drivername", nvmf.DefaultDriverName, "CSI Driver")
flag.StringVar(&conf.Region, "region", "test_region", "Region")
flag.StringVar(&conf.Version, "version", nvmf.DefaultDriverVersion, "Version")
flag.StringVar(&conf.NVMfVolumeMapDir, "nvmfVolumeMapDir", nvmf.DefaultVolumeMapPath, "Persistent volume")
flag.StringVar(&conf.NVMfBackendEndpoint, "nvmfBackendEndpoint", nvmf.DefaultBackendEndpoint, "NVMf Volume backend controller")
}
func main() {

View File

@ -0,0 +1,55 @@
package client
type CreateVolumeResponse struct {
ID string `json:"ID,omitempty"`
CapacityBytes uint64 `json:"capacityBytes,omitempty"`
TargetType string `json:"targetType,omitempty"`
TargetConfig *TargetConfig `json:"targetConfig,omitempty"`
}
type TargetConfig struct {
Hosts []*Host `json:"hosts,omitempty"`
Ports []*Port `json:"ports,omitempty"`
Subsystems []*Subsystem `json:"subsystems,omitempty"`
}
type Host struct {
NQN string `json:"NQN,omitempty"`
}
type Port struct {
Addr *Addr `json:"addr,omitempty"`
PortID uint64 `json:"portID,omitempty"`
Subsystems []string `json:"subsystems,omitempty"`
}
type Addr struct {
AdrFam string `json:"adrFam,omitempty"`
TrAddr string `json:"trAddr,omitempty"`
TrSvcID string `json:"trSvcID,omitempty"`
TrType string `json:"trType,omitempty"`
}
type Subsystem struct {
AllowedHosts []string `json:"AllowedHosts,omitempty"`
Attr *Attr `json:"attr,omitempty"`
Namespaces []*Namespace `json:"namespaces,omitempty"`
NQN string `json:"NQN,omitempty"`
}
type Attr struct {
AllowAnyHost string `json:"allowAnyHost,omitempty"`
Serial string `json:"serial,omitempty"`
}
type Namespace struct {
Device *Device `json:"device,omitempty"`
Enable uint32 `json:"enable,omitempty"`
NSID uint64 `json:"NSID,omitempty"`
}
type Device struct {
NGUID string `json:"NGUID,omitempty"`
Path string `json:"path,omitempty"`
UUID string `json:"UUID,omitempty"`
}

View File

@ -1,6 +1,11 @@
package client
import "net/http"
import (
"encoding/json"
"net/http"
"k8s.io/klog/v2"
)
type Response struct {
statusCode int
@ -8,3 +13,23 @@ type Response struct {
body []byte
err error
}
func (r *Response) StatusCode() int {
return r.statusCode
}
func (r *Response) Body() []byte {
return r.body
}
func (r *Response) Err() error {
return r.err
}
func (r *Response) Parse(m interface{}) error {
if r.err != nil {
return r.err
}
klog.Infof("Json unmarshal: %v", string(r.body))
return json.Unmarshal(r.body, m)
}

48
pkg/nvmf/config.go Normal file
View File

@ -0,0 +1,48 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nvmf
const (
NVMF_NQN_SIZE = 223
SYS_NVMF = "/sys/class/nvmf"
)
// Here erron
const (
ENOENT = 1 /* No such file or directory */
EINVAL = 2 /* Invalid argument */
)
const (
DefaultDriverName = "csi.nvmf.com"
DefaultDriverServicePort = "12230"
DefaultDriverVersion = "v1.0.0"
DefaultVolumeMapPath = "/var/lib/nvmf/volumes"
DefaultBackendEndpoint = ""
)
type GlobalConfig struct {
NVMfVolumeMapDir string
NVMfBackendEndpoint string
DriverName string
Region string
NodeID string
Endpoint string // CSI endpoint
Version string
IsControllerServer bool
LogLevel string
}

View File

@ -1,46 +1,9 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nvmf
const (
NVMF_NQN_SIZE = 223
SYS_NVMF = "/sys/class/nvmf"
TargetTrAddr = "targetTrAddr"
TargetTrPort = "targetTrPort"
TargetTrType = "targetTrType"
DeviceUUID = "deviceUUID"
NQN = "nqn"
)
// Here erron
const (
ENOENT = 1 /* No such file or directory */
EINVAL = 2 /* Invalid argument */
)
const (
DefaultDriverName = "csi.nvmf.com"
DefaultDriverServicePort = "12230"
DefaultDriverVersion = "v1.0.0"
DefaultVolumeMapPath = "/var/lib/nvmf/volumes"
)
type GlobalConfig struct {
NVMfVolumeMapDir string
DriverName string
Region string
NodeID string
Endpoint string // CSI endpoint
Version string
IsControllerServer bool
LogLevel string
}

View File

@ -17,13 +17,22 @@ limitations under the License.
package nvmf
import (
"encoding/json"
"net/http"
"os"
"path/filepath"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-nvmf/pkg/client"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
// the map of VolumeName to PvName for idempotency.
var createdVolumeMap = map[string]*csi.Volume{}
type ControllerServer struct {
Driver *driver
}
@ -35,14 +44,148 @@ func NewControllerServer(d *driver) *ControllerServer {
}
}
// You should realize your volume provider here, such as requesting the Cloud to create an NVMf block and
// returning specific information to you
func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "CreateVolume should implement by yourself. ")
//1. check parameters
if err := c.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return nil, status.Error(codes.InvalidArgument, "CreateVolume: NVMf Driver not support create volume.")
}
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume: volume's name cannot be empty.")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume: volume %s's capabilities cannot be empty.", req.GetName())
}
if req.GetCapacityRange() == nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume: volume %s's capacityRange cannot be empty.", req.GetName())
}
volSizeBytes := int64(req.GetCapacityRange().RequiredBytes)
createVolArgs := req.GetParameters()
createVolReq, err := ParseCreateVolumeParameters(createVolArgs)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume: volume %s's parameters parsed error: %s", req.GetName(), err)
}
// 2. if created, return created volume info
oldVol, ok := createdVolumeMap[req.GetName()]
if ok {
// todo: check more vol context like permission?
// oldVolContext := oldVol.GetVolumeContext()
klog.Warningf("CreateVolume: volume %s has already created and volumeId: %s", req.GetName(), oldVol.VolumeId)
if oldVol.CapacityBytes != volSizeBytes {
return nil, status.Errorf(codes.InvalidArgument, "CreateVolume: the exist vol-%s's size %d is different from the requested volume %s's size %d.", oldVol.VolumeId, oldVol.CapacityBytes, req.GetName(), volSizeBytes)
}
return &csi.CreateVolumeResponse{
Volume: oldVol,
}, nil
}
// 3. if not created, request backend controller to create a new volume
createVolReq.Name = req.GetName()
createVolReq.SizeByte = req.GetCapacityRange().RequiredBytes
volReqBody, err := json.Marshal(createVolReq)
if err != nil {
return nil, status.Errorf(codes.Internal, "CreateVolume: json marshal error: %s", err)
}
response := c.Driver.client.Post().Action("/volume/create").Body(volReqBody).Do()
klog.Infof("CreateVolume:create volume backend response's statuscode: %d, res: %s", response.StatusCode(), string(response.Body()))
if err := response.Err(); err != nil {
return nil, status.Errorf(codes.Internal, "CreateVolume: backend response error: %s", err)
}
if response.StatusCode() != http.StatusOK {
return nil, status.Errorf(codes.Internal, "CreateVolume: backend has reponse but failed for volume %s", req.GetName())
}
var createVolumeResponse client.CreateVolumeResponse
err = response.Parse(&createVolumeResponse)
if err != nil {
return nil, status.Errorf(codes.Internal, "CreateVolume: response parse error: %s", err)
}
klog.Infof("CreateVolume: createVolume success for volume %s, volume ID: %s", req.GetName(), createVolumeResponse.ID)
volContext, _ := GetVolumeContext(&createVolumeResponse)
tmpVolume := &csi.Volume{
VolumeId: createVolumeResponse.ID,
CapacityBytes: int64(createVolumeResponse.CapacityBytes),
VolumeContext: volContext,
}
createdVolumeMap[createVolumeResponse.ID] = tmpVolume
err = PersistVolumeInfo(tmpVolume, filepath.Join(c.Driver.volumeMapDir, tmpVolume.GetVolumeId()))
if err != nil {
klog.Warningf("CreateVolume: create volume %s success, but persistent error: %s", req.GetName(), err)
}
return &csi.CreateVolumeResponse{Volume: tmpVolume}, nil
}
func (c *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "DeleteVolume should implement by yourself. ")
// 1. check parameters
if err := c.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "DeleteVolume: NVMf Driver not support delete volume.")
}
if len(req.GetVolumeId()) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "DeleteVolume: delete req's volume ID can't be empty.")
}
if req.GetSecrets() == nil {
return nil, status.Errorf(codes.InvalidArgument, "DeleteVolume: delete req's secrets can't be empty")
}
var volume *csi.Volume
var err error
volumeMapFilePath := filepath.Join(c.Driver.volumeMapDir, req.GetVolumeId())
volume, ok := createdVolumeMap[req.GetVolumeId()]
if !ok {
klog.Errorf("DeleteVolume: can't find the vol-%s in driver cache", req.GetVolumeId())
volume, err = GetVolumeInfoFromFile(volumeMapFilePath)
if err != nil {
return nil, status.Errorf(codes.NotFound, "DeleteVolume: can't get vol-%s info from cache or file, not exist.", req.GetVolumeId())
}
klog.Warningf("DeleteVolume: get vol-%s from file.", req.GetVolumeId())
}
//todo: P0-delete should add some permission check.
deleteVolReq := &DeleteVolumeRequest{
VolumeId: volume.VolumeId,
}
deleteVolBody, err := json.Marshal(deleteVolReq)
if err != nil {
return nil, status.Errorf(codes.Internal, "DeletedVolume: json marshal error: %s", err)
}
response := c.Driver.client.Post().Action("/volume/delete").Body(deleteVolBody).Do()
klog.Infof("DeleteVolume: delete volume backend response's statuscode: %d, res: %s", response.StatusCode(), string(response.Body()))
if response.StatusCode() != http.StatusOK {
if response.StatusCode() == 401 {
klog.Errorf("DeleteVolume: no permission to delete vol-%s", volume.VolumeId)
}
return nil, status.Errorf(codes.Internal, "DeleteVolume: backend has response but not success")
}
delete(createdVolumeMap, volume.VolumeId)
err = os.Remove(volumeMapFilePath)
if err != nil {
klog.Warningf("DeleteVolume: can't remove vol-%s mapping file %s for error: %s.", volume.VolumeId, volumeMapFilePath, err)
}
klog.Infof("DeleteVolume: delete vol-%s success.", volume.VolumeId)
return &csi.DeleteVolumeResponse{}, nil
}
func (c *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-nvmf/pkg/client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
@ -37,6 +38,8 @@ type driver struct {
nodeServer *NodeServer
controllerServer *ControllerServer
client *client.Client
cap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
}
@ -59,6 +62,7 @@ func NewDriver(conf *GlobalConfig) *driver {
}
func (d *driver) Run(conf *GlobalConfig) {
var err error
d.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
@ -71,6 +75,11 @@ func (d *driver) Run(conf *GlobalConfig) {
d.idServer = NewIdentityServer(d)
d.nodeServer = NewNodeServer(d)
if conf.IsControllerServer {
d.client, err = client.NewClient(conf.NVMfBackendEndpoint)
if err != nil {
klog.Fatal("create http client failed.")
return
}
d.controllerServer = NewControllerServer(d)
}

View File

@ -55,11 +55,11 @@ func getNVMfDiskInfo(req *csi.NodePublishVolumeRequest) (*nvmfDiskInfo, error) {
volName := req.GetVolumeId()
volOpts := req.GetVolumeContext()
targetTrAddr := volOpts["targetTrAddr"]
targetTrPort := volOpts["targetTrPort"]
targetTrType := volOpts["targetTrType"]
deviceUUID := volOpts["deviceUUID"]
nqn := volOpts["nqn"]
targetTrAddr := volOpts[TargetTrAddr]
targetTrPort := volOpts[TargetTrPort]
targetTrType := volOpts[TargetTrType]
deviceUUID := volOpts[DeviceUUID]
nqn := volOpts[NQN]
if targetTrAddr == "" || nqn == "" || targetTrPort == "" || targetTrType == "" {
return nil, fmt.Errorf("Some Nvme target info is missing, volID: %s ", volName)

82
pkg/nvmf/volume.go Normal file
View File

@ -0,0 +1,82 @@
package nvmf
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-nvmf/pkg/client"
)
//todo: add allowHostNqn request para
type CreateVolumeRequest struct {
Name string
SizeByte int64
// AllowHostNqn string
ReadOnly bool
}
type DeleteVolumeRequest struct {
VolumeId string
}
func ParseCreateVolumeParameters(parameters map[string]string) (volReq *CreateVolumeRequest, err error) {
//todo: need more parameters for nvmf
// readonly
readonly, ok := parameters["readonly"]
if !ok {
volReq.ReadOnly = false
} else {
readonly = strings.ToLower(readonly)
if readonly == "yes" || readonly == "true" || readonly == "1" {
volReq.ReadOnly = true
} else {
volReq.ReadOnly = false
}
}
return volReq, nil
}
func GetVolumeContext(volRes *client.CreateVolumeResponse) (map[string]string, error) {
//todo: volume context parse failed?
volContext := make(map[string]string)
volContext[TargetTrAddr] = volRes.TargetConfig.Ports[0].Addr.TrAddr
volContext[TargetTrPort] = volRes.TargetConfig.Ports[0].Addr.TrSvcID
volContext[TargetTrType] = volRes.TargetConfig.Ports[0].Addr.TrAddr
volContext[DeviceUUID] = volRes.TargetConfig.Subsystems[0].Namespaces[0].Device.UUID
volContext[NQN] = volRes.TargetConfig.Subsystems[0].NQN
return volContext, nil
}
func PersistVolumeInfo(v *csi.Volume, filePath string) error {
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating nvme volume persistence file %s: %v", filePath, err)
}
defer f.Close()
encoder := json.NewEncoder(f)
if err = encoder.Encode(v); err != nil {
return fmt.Errorf("error encoding volume: %v", err)
}
return nil
}
func GetVolumeInfoFromFile(filePath string) (*csi.Volume, error) {
f, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, err
}
data := csi.Volume{}
err = json.Unmarshal([]byte(f), &data)
if err != nil {
return nil, err
}
return &data, nil
}