Files
Patrick Ohly 5b63199f46 e2e storage: add Rename to PodIO
This is useful for atomic file creation: first create a temporary file, then
rename it.
2022-09-30 13:25:26 +02:00

284 lines
7.0 KiB
Go

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/cache"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
// Name is the name of the CSI plug-in.
Name = "io.kubernetes.storage.mock"
// VendorVersion is the version returned by GetPluginInfo.
VendorVersion = "0.3.0"
// TopologyKey simulates a per-node topology.
TopologyKey = Name + "/node"
// TopologyValue is the one, fixed node on which the driver runs.
TopologyValue = "some-mock-node"
)
// Manifest is the SP's manifest.
var Manifest = map[string]string{
"url": "https://github.com/kubernetes/kubernetes/tree/master/test/e2e/storage/drivers/csi-test/mock",
}
type Config struct {
DisableAttach bool
DriverName string
AttachLimit int64
NodeExpansionRequired bool
VolumeMountGroupRequired bool
DisableControllerExpansion bool
DisableOnlineExpansion bool
PermissiveTargetPath bool
EnableTopology bool
IO DirIO
}
// DirIO is an abstraction over direct os calls.
type DirIO interface {
// DirExists returns false if the path doesn't exist, true if it exists and is a directory, an error otherwise.
DirExists(path string) (bool, error)
// Mkdir creates the directory, but not its parents, with 0755 permissions.
Mkdir(path string) error
// RemoveAll removes the path and everything contained inside it. It's not an error if the path does not exist.
RemoveAll(path string) error
// Rename changes the name of a file or directory. The parent directory
// of newPath must exist.
Rename(oldPath, newPath string) error
}
type OSDirIO struct{}
func (o OSDirIO) DirExists(path string) (bool, error) {
info, err := os.Stat(path)
switch {
case err == nil && !info.IsDir():
return false, fmt.Errorf("%s: not a directory", path)
case err == nil:
return true, nil
case os.IsNotExist(err):
return false, nil
default:
return false, err
}
}
func (o OSDirIO) Mkdir(path string) error {
return os.Mkdir(path, os.FileMode(0755))
}
func (o OSDirIO) RemoveAll(path string) error {
return os.RemoveAll(path)
}
func (o OSDirIO) Rename(oldPath, newPath string) error {
return os.Rename(oldPath, newPath)
}
// Service is the CSI Mock service provider.
type Service interface {
csi.ControllerServer
csi.IdentityServer
csi.NodeServer
}
type service struct {
sync.Mutex
nodeID string
vols []csi.Volume
volsRWL sync.RWMutex
volsNID uint64
snapshots cache.SnapshotCache
snapshotsNID uint64
config Config
}
type Volume struct {
VolumeCSI csi.Volume
NodeID string
ISStaged bool
ISPublished bool
ISEphemeral bool
ISControllerPublished bool
StageTargetPath string
TargetPath string
}
var MockVolumes map[string]Volume
// New returns a new Service.
func New(config Config) Service {
s := &service{
nodeID: config.DriverName,
config: config,
}
if s.config.IO == nil {
s.config.IO = OSDirIO{}
}
s.snapshots = cache.NewSnapshotCache()
s.vols = []csi.Volume{
s.newVolume("Mock Volume 1", gib100),
s.newVolume("Mock Volume 2", gib100),
s.newVolume("Mock Volume 3", gib100),
}
MockVolumes = map[string]Volume{}
s.snapshots.Add(s.newSnapshot("Mock Snapshot 1", "1", map[string]string{"Description": "snapshot 1"}))
s.snapshots.Add(s.newSnapshot("Mock Snapshot 2", "2", map[string]string{"Description": "snapshot 2"}))
s.snapshots.Add(s.newSnapshot("Mock Snapshot 3", "3", map[string]string{"Description": "snapshot 3"}))
return s
}
const (
kib int64 = 1024
mib int64 = kib * 1024
gib int64 = mib * 1024
gib100 int64 = gib * 100
tib int64 = gib * 1024
)
func (s *service) newVolume(name string, capcity int64) csi.Volume {
vol := csi.Volume{
VolumeId: fmt.Sprintf("%d", atomic.AddUint64(&s.volsNID, 1)),
VolumeContext: map[string]string{"name": name},
CapacityBytes: capcity,
}
s.setTopology(&vol)
return vol
}
func (s *service) newVolumeFromSnapshot(name string, capacity int64, snapshotID int) csi.Volume {
vol := s.newVolume(name, capacity)
vol.ContentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: fmt.Sprintf("%d", snapshotID),
},
},
}
s.setTopology(&vol)
return vol
}
func (s *service) newVolumeFromVolume(name string, capacity int64, volumeID int) csi.Volume {
vol := s.newVolume(name, capacity)
vol.ContentSource = &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: fmt.Sprintf("%d", volumeID),
},
},
}
s.setTopology(&vol)
return vol
}
func (s *service) setTopology(vol *csi.Volume) {
if s.config.EnableTopology {
vol.AccessibleTopology = []*csi.Topology{
{
Segments: map[string]string{
TopologyKey: TopologyValue,
},
},
}
}
}
func (s *service) findVol(k, v string) (volIdx int, volInfo csi.Volume) {
s.volsRWL.RLock()
defer s.volsRWL.RUnlock()
return s.findVolNoLock(k, v)
}
func (s *service) findVolNoLock(k, v string) (volIdx int, volInfo csi.Volume) {
volIdx = -1
for i, vi := range s.vols {
switch k {
case "id":
if strings.EqualFold(v, vi.GetVolumeId()) {
return i, vi
}
case "name":
if n, ok := vi.VolumeContext["name"]; ok && strings.EqualFold(v, n) {
return i, vi
}
}
}
return
}
func (s *service) findVolByName(
ctx context.Context, name string) (int, csi.Volume) {
return s.findVol("name", name)
}
func (s *service) findVolByID(
ctx context.Context, id string) (int, csi.Volume) {
return s.findVol("id", id)
}
func (s *service) newSnapshot(name, sourceVolumeId string, parameters map[string]string) cache.Snapshot {
ptime := timestamppb.Now()
return cache.Snapshot{
Name: name,
Parameters: parameters,
SnapshotCSI: csi.Snapshot{
SnapshotId: fmt.Sprintf("%d", atomic.AddUint64(&s.snapshotsNID, 1)),
CreationTime: ptime,
SourceVolumeId: sourceVolumeId,
ReadyToUse: true,
},
}
}
// getAttachCount returns the number of attached volumes on the node.
func (s *service) getAttachCount(devPathKey string) int64 {
var count int64
for _, v := range s.vols {
if device := v.VolumeContext[devPathKey]; device != "" {
count++
}
}
return count
}
func (s *service) execHook(hookName string) (codes.Code, string) {
return codes.OK, ""
}