remove CSI-migration gate

This commit is contained in:
cyclinder 2023-04-03 10:55:24 +08:00
parent eca1f9d2d5
commit 8e4228a8c1
20 changed files with 9 additions and 2794 deletions

View File

@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/azure_file"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/gcepd"
"k8s.io/kubernetes/pkg/volume/portworx"
"k8s.io/kubernetes/pkg/volume/rbd"
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
@ -62,7 +61,6 @@ type pluginInfo struct {
func appendAttachableLegacyProviderVolumes(logger klog.Logger, allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
pluginMigrationStatus := make(map[string]pluginInfo)
pluginMigrationStatus[plugins.GCEPDInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationGCE, pluginUnregisterFeature: features.InTreePluginGCEUnregister, pluginProbeFunction: gcepd.ProbeVolumePlugins}
pluginMigrationStatus[plugins.VSphereInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationvSphere, pluginUnregisterFeature: features.InTreePluginvSphereUnregister, pluginProbeFunction: vsphere_volume.ProbeVolumePlugins}
pluginMigrationStatus[plugins.PortworxVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationPortworx, pluginUnregisterFeature: features.InTreePluginPortworxUnregister, pluginProbeFunction: portworx.ProbeVolumePlugins}
pluginMigrationStatus[plugins.RBDVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationRBD, pluginUnregisterFeature: features.InTreePluginRBDUnregister, pluginProbeFunction: rbd.ProbeVolumePlugins}

View File

@ -31,7 +31,6 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/azure_file"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/gcepd"
"k8s.io/kubernetes/pkg/volume/portworx"
"k8s.io/kubernetes/pkg/volume/rbd"
"k8s.io/kubernetes/pkg/volume/vsphere_volume"
@ -66,7 +65,6 @@ type pluginInfo struct {
func appendLegacyProviderVolumes(allPlugins []volume.VolumePlugin, featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
pluginMigrationStatus := make(map[string]pluginInfo)
pluginMigrationStatus[plugins.GCEPDInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationGCE, pluginUnregisterFeature: features.InTreePluginGCEUnregister, pluginProbeFunction: gcepd.ProbeVolumePlugins}
pluginMigrationStatus[plugins.AzureFileInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationAzureFile, pluginUnregisterFeature: features.InTreePluginAzureFileUnregister, pluginProbeFunction: azure_file.ProbeVolumePlugins}
pluginMigrationStatus[plugins.VSphereInTreePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationvSphere, pluginUnregisterFeature: features.InTreePluginvSphereUnregister, pluginProbeFunction: vsphere_volume.ProbeVolumePlugins}
pluginMigrationStatus[plugins.PortworxVolumePluginName] = pluginInfo{pluginMigrationFeature: features.CSIMigrationPortworx, pluginUnregisterFeature: features.InTreePluginPortworxUnregister, pluginProbeFunction: portworx.ProbeVolumePlugins}

View File

@ -130,14 +130,6 @@ const (
// Enables the Azure File in-tree driver to Azure File Driver migration feature.
CSIMigrationAzureFile featuregate.Feature = "CSIMigrationAzureFile"
// owner: @davidz627
// alpha: v1.14
// beta: v1.17
// GA: 1.25
//
// Enables the GCE PD in-tree driver to GCE CSI Driver migration feature.
CSIMigrationGCE featuregate.Feature = "CSIMigrationGCE"
// owner: @mfordjody
// alpha: v1.26
//
@ -880,8 +872,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
CSIMigrationAzureFile: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
CSIMigrationGCE: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.27
CSIMigrationPortworx: {Default: false, PreRelease: featuregate.Beta}, // Off by default (requires Portworx CSI driver)
CSIMigrationRBD: {Default: false, PreRelease: featuregate.Alpha}, // Off by default (requires RBD CSI driver)

View File

@ -92,7 +92,6 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/gcepd"
_ "k8s.io/kubernetes/pkg/volume/hostpath"
volumesecret "k8s.io/kubernetes/pkg/volume/secret"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -368,7 +367,6 @@ func newTestKubeletWithImageList(
if initFakeVolumePlugin {
allPlugins = append(allPlugins, plug)
} else {
allPlugins = append(allPlugins, gcepd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, volumesecret.ProbeVolumePlugins()...)
}

View File

@ -46,9 +46,7 @@ func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
return false
}
case csilibplugins.GCEPDInTreePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) {
return false
}
return true
case csilibplugins.AzureDiskInTreePluginName:
return true
case csilibplugins.CinderInTreePluginName:

View File

@ -1088,7 +1088,7 @@ func isCSIMigrationOnForPlugin(pluginName string) bool {
case csiplugins.AWSEBSInTreePluginName:
return true
case csiplugins.GCEPDInTreePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
return true
case csiplugins.AzureDiskInTreePluginName:
return true
case csiplugins.CinderInTreePluginName:

View File

@ -217,7 +217,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
var migratedPlugins = map[string](func() bool){
csitranslationplugins.GCEPDInTreePluginName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
return true
},
csitranslationplugins.AWSEBSInTreePluginName: func() bool {
return true

View File

@ -91,7 +91,7 @@ func (pm PluginManager) IsMigrationEnabledForPlugin(pluginName string) bool {
case csilibplugins.AWSEBSInTreePluginName:
return true
case csilibplugins.GCEPDInTreePluginName:
return pm.featureGate.Enabled(features.CSIMigrationGCE)
return true
case csilibplugins.AzureFileInTreePluginName:
return pm.featureGate.Enabled(features.CSIMigrationAzureFile)
case csilibplugins.AzureDiskInTreePluginName:

View File

@ -104,9 +104,8 @@ func TestMigrationFeatureFlagStatus(t *testing.T) {
csiMigrationCompleteResult bool
}{
{
name: "gce-pd migration flag enabled and migration-complete flag disabled with CSI migration flag enabled",
name: "gce-pd migration flag enabled and migration-complete flag disabled with CSI migration flag",
pluginName: "kubernetes.io/gce-pd",
pluginFeature: features.CSIMigrationGCE,
pluginFeatureEnabled: true,
csiMigrationEnabled: true,
inTreePluginUnregister: features.InTreePluginGCEUnregister,
@ -115,9 +114,8 @@ func TestMigrationFeatureFlagStatus(t *testing.T) {
csiMigrationCompleteResult: false,
},
{
name: "gce-pd migration flag enabled and migration-complete flag enabled with CSI migration flag enabled",
name: "gce-pd migration flag enabled and migration-complete flag enabled with CSI migration flag",
pluginName: "kubernetes.io/gce-pd",
pluginFeature: features.CSIMigrationGCE,
pluginFeatureEnabled: true,
csiMigrationEnabled: true,
inTreePluginUnregister: features.InTreePluginGCEUnregister,
@ -155,7 +153,7 @@ func TestMigrationFeatureFlagStatus(t *testing.T) {
// CSIMigrationGCE is locked to on, so it cannot be enabled or disabled. There are a couple
// of test cases that check correct behavior when CSIMigrationGCE is enabled, but there are
// no longer any tests cases for CSIMigrationGCE being disabled as that is not possible.
if test.pluginFeature != features.CSIMigrationGCE {
if len(test.pluginFeature) > 0 {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, test.pluginFeature, test.pluginFeatureEnabled)()
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, test.inTreePluginUnregister, test.inTreePluginUnregisterEnabled)()

View File

@ -1,13 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- saad-ali
- thockin
reviewers:
- saad-ali
- jsafrane
- jingxu97
- gnufied
- msau42
emeritus_approvers:
- davidz627

View File

@ -1,410 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"time"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/legacy-cloud-providers/gce"
)
type gcePersistentDiskAttacher struct {
host volume.VolumeHost
gceDisks gce.Disks
}
var _ volume.Attacher = &gcePersistentDiskAttacher{}
var _ volume.DeviceMounter = &gcePersistentDiskAttacher{}
var _ volume.AttachableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.DeviceMountableVolumePlugin = &gcePersistentDiskPlugin{}
func (plugin *gcePersistentDiskPlugin) NewAttacher() (volume.Attacher, error) {
gceCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return nil, err
}
return &gcePersistentDiskAttacher{
host: plugin.host,
gceDisks: gceCloud,
}, nil
}
func (plugin *gcePersistentDiskPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
return plugin.NewAttacher()
}
func (plugin *gcePersistentDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
return mounter.GetMountRefs(deviceMountPath)
}
// Attach checks with the GCE cloud provider if the specified volume is already
// attached to the node with the specified Name.
// If the volume is attached, it succeeds (returns nil).
// If it is not, Attach issues a call to the GCE cloud provider to attach it.
// Callers are responsible for retrying on failure.
// Callers are responsible for thread safety between concurrent attach and
// detach operations.
func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return "", err
}
pdName := volumeSource.PDName
attached, err := attacher.gceDisks.DiskIsAttached(pdName, nodeName)
if err != nil {
// Log error and continue with attach
klog.Errorf(
"Error checking if PD (%q) is already attached to current node (%q). Will continue and try attach anyway. err=%v",
pdName, nodeName, err)
}
if err == nil && attached {
// Volume is already attached to node.
klog.Infof("Attach operation is successful. PD %q is already attached to node %q.", pdName, nodeName)
} else {
if err := attacher.gceDisks.AttachDisk(pdName, nodeName, readOnly, isRegionalPD(spec)); err != nil {
klog.Errorf("Error attaching PD %q to node %q: %+v", pdName, nodeName, err)
return "", err
}
}
return filepath.Join(diskByIDPath, diskGooglePrefix+pdName), nil
}
func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumePdNameMap := make(map[string]*volume.Spec)
pdNameList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
// If error is occurred, skip this volume and move to the next one
if err != nil {
klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
pdNameList = append(pdNameList, volumeSource.PDName)
volumesAttachedCheck[spec] = true
volumePdNameMap[volumeSource.PDName] = spec
}
attachedResult, err := attacher.gceDisks.DisksAreAttached(pdNameList, nodeName)
if err != nil {
// Log error and continue with attach
klog.Errorf(
"Error checking if PDs (%v) are already attached to current node (%q). err=%v",
pdNameList, nodeName, err)
return volumesAttachedCheck, err
}
for pdName, attached := range attachedResult {
if !attached {
spec := volumePdNameMap[pdName]
volumesAttachedCheck[spec] = false
klog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", pdName, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *gcePersistentDiskAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
diskNamesByNode := make(map[types.NodeName][]string)
volumeSpecToDiskName := make(map[*volume.Spec]string)
for nodeName, volumeSpecs := range volumesByNode {
diskNames := []string{}
for _, spec := range volumeSpecs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
klog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
diskNames = append(diskNames, volumeSource.PDName)
volumeSpecToDiskName[spec] = volumeSource.PDName
}
diskNamesByNode[nodeName] = diskNames
}
attachedDisksByNode, err := attacher.gceDisks.BulkDisksAreAttached(diskNamesByNode)
if err != nil {
return nil, err
}
for nodeName, volumeSpecs := range volumesByNode {
volumesAreAttachedToNode := make(map[*volume.Spec]bool)
for _, spec := range volumeSpecs {
diskName := volumeSpecToDiskName[spec]
volumesAreAttachedToNode[spec] = attachedDisksByNode[nodeName][diskName]
}
volumesAttachedCheck[nodeName] = volumesAreAttachedToNode
}
return volumesAttachedCheck, nil
}
// search Windows disk number by LUN
func getDiskID(pdName string, exec utilexec.Interface) (string, error) {
// TODO: replace Get-GcePdName with native windows support of Get-Disk, see issue #74674
cmd := `Get-GcePdName | select Name, DeviceId | ConvertTo-Json`
output, err := exec.Command("powershell", "/c", cmd).CombinedOutput()
if err != nil {
klog.Errorf("Get-GcePdName failed, error: %v, output: %q", err, string(output))
err = errors.New(err.Error() + " " + string(output))
return "", err
}
var data []map[string]interface{}
if err = json.Unmarshal(output, &data); err != nil {
klog.Errorf("Get-Disk output is not a json array, output: %q", string(output))
return "", err
}
for _, pd := range data {
if jsonName, ok := pd["Name"]; ok {
if name, ok := jsonName.(string); ok {
if name == pdName {
klog.Infof("found the disk %q", name)
if diskNum, ok := pd["DeviceId"]; ok {
switch v := diskNum.(type) {
case int:
return strconv.Itoa(v), nil
case float64:
return strconv.Itoa(int(v)), nil
case string:
return v, nil
default:
// diskNum isn't one of the types above
klog.Warningf("Disk %q found, but disknumber (%q) is not in one of the recongnized type", name, diskNum)
}
}
}
}
}
}
return "", fmt.Errorf("could not found disk number for disk %q", pdName)
}
func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, _ *v1.Pod, timeout time.Duration) (string, error) {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
pdName := volumeSource.PDName
if runtime.GOOS == "windows" {
exec := attacher.host.GetExec(gcePersistentDiskPluginName)
id, err := getDiskID(pdName, exec)
if err != nil {
klog.Errorf("WaitForAttach (windows) failed with error %s", err)
return "", err
}
return id, nil
}
partition := ""
if volumeSource.Partition != 0 {
partition = strconv.Itoa(int(volumeSource.Partition))
}
sdBefore, err := filepath.Glob(diskSDPattern)
if err != nil {
klog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
}
sdBeforeSet := sets.NewString(sdBefore...)
devicePaths := getDiskByIDPaths(pdName, partition)
for {
select {
case <-ticker.C:
klog.V(5).Infof("Checking GCE PD %q is attached.", pdName)
path, err := verifyDevicePath(devicePaths, sdBeforeSet, pdName)
if err != nil {
// Log error, if any, and continue checking periodically. See issue #11321
klog.Errorf("Error verifying GCE PD (%q) is attached: %v", pdName, err)
} else if path != "" {
// A device path has successfully been created for the PD
klog.Infof("Successfully found attached GCE PD %q.", pdName)
return path, nil
} else {
klog.V(4).Infof("could not verify GCE PD (%q) is attached, device path does not exist", pdName)
}
case <-timer.C:
return "", fmt.Errorf("could not find attached GCE PD %q. Timeout waiting for mount paths to be created", pdName)
}
}
}
func (attacher *gcePersistentDiskAttacher) GetDeviceMountPath(
spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return makeGlobalPDName(attacher.host, volumeSource.PDName), nil
}
func (attacher *gcePersistentDiskAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, _ volume.DeviceMounterArgs) error {
// Only mount the PD globally once.
mounter := attacher.host.GetMounter(gcePersistentDiskPluginName)
notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath)
if err != nil {
if os.IsNotExist(err) {
dir := deviceMountPath
if runtime.GOOS == "windows" {
// in windows, as we use mklink, only need to MkdirAll for parent directory
dir = filepath.Dir(deviceMountPath)
}
if err := os.MkdirAll(dir, 0750); err != nil {
return fmt.Errorf("MountDevice:CreateDirectory failed with %s", err)
}
notMnt = true
} else {
return err
}
}
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return err
}
options := []string{}
if readOnly {
options = append(options, "ro")
}
if notMnt {
diskMounter := volumeutil.NewSafeFormatAndMountFromHost(gcePersistentDiskPluginName, attacher.host)
mountOptions := volumeutil.MountOptionFromSpec(spec, options...)
err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, mountOptions)
if err != nil {
os.Remove(deviceMountPath)
return err
}
klog.V(4).Infof("formatting spec %v devicePath %v deviceMountPath %v fs %v with options %+v", spec.Name(), devicePath, deviceMountPath, volumeSource.FSType, options)
}
return nil
}
type gcePersistentDiskDetacher struct {
host volume.VolumeHost
gceDisks gce.Disks
}
var _ volume.Detacher = &gcePersistentDiskDetacher{}
var _ volume.DeviceUnmounter = &gcePersistentDiskDetacher{}
func (plugin *gcePersistentDiskPlugin) NewDetacher() (volume.Detacher, error) {
gceCloud, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return nil, err
}
return &gcePersistentDiskDetacher{
host: plugin.host,
gceDisks: gceCloud,
}, nil
}
func (plugin *gcePersistentDiskPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
return plugin.NewDetacher()
}
// Detach checks with the GCE cloud provider if the specified volume is already
// attached to the specified node. If the volume is not attached, it succeeds
// (returns nil). If it is attached, Detach issues a call to the GCE cloud
// provider to attach it.
// Callers are responsible for retrying on failure.
// Callers are responsible for thread safety between concurrent attach and detach
// operations.
func (detacher *gcePersistentDiskDetacher) Detach(volumeName string, nodeName types.NodeName) error {
pdName := path.Base(volumeName)
attached, err := detacher.gceDisks.DiskIsAttached(pdName, nodeName)
if err != nil {
// Log error and continue with detach
klog.Errorf(
"Error checking if PD (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
pdName, nodeName, err)
}
if err == nil && !attached {
// Volume is not attached to node. Success!
klog.Infof("Detach operation is successful. PD %q was not attached to node %q.", pdName, nodeName)
return nil
}
if err = detacher.gceDisks.DetachDisk(pdName, nodeName); err != nil {
klog.Errorf("Error detaching PD %q from node %q: %v", pdName, nodeName, err)
return err
}
return nil
}
func (detacher *gcePersistentDiskDetacher) UnmountDevice(deviceMountPath string) error {
if runtime.GOOS == "windows" {
// Flush data cache for windows because it does not do so automatically during unmount device
exec := detacher.host.GetExec(gcePersistentDiskPluginName)
err := volumeutil.WriteVolumeCache(deviceMountPath, exec)
if err != nil {
return err
}
}
return mount.CleanupMountPoint(deviceMountPath, detacher.host.GetMounter(gcePersistentDiskPluginName), false)
}
func (plugin *gcePersistentDiskPlugin) CanAttach(spec *volume.Spec) (bool, error) {
return true, nil
}
func (plugin *gcePersistentDiskPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
return true, nil
}

View File

@ -1,614 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"errors"
"fmt"
"os"
"path/filepath"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
cloudvolume "k8s.io/cloud-provider/volume"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/legacy-cloud-providers/gce"
"strings"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
func TestGetDeviceName_Volume(t *testing.T) {
plugin := newPlugin(t)
name := "my-pd-volume"
spec := createVolSpec(name, false)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != name {
t.Errorf("GetDeviceName error: expected %s, got %s", name, deviceName)
}
}
func TestGetDeviceName_PersistentVolume(t *testing.T) {
plugin := newPlugin(t)
name := "my-pd-pv"
spec := createPVSpec(name, true, nil)
deviceName, err := plugin.GetVolumeName(spec)
if err != nil {
t.Errorf("GetDeviceName error: %v", err)
}
if deviceName != name {
t.Errorf("GetDeviceName error: expected %s, got %s", name, deviceName)
}
}
// One testcase for TestAttachDetach table test below
type testcase struct {
name string
// For fake GCE:
attach attachCall
detach detachCall
diskIsAttached diskIsAttachedCall
t *testing.T
// Actual test to run
test func(test *testcase) error
// Expected return of the test
expectedReturn error
}
func TestAttachDetachRegional(t *testing.T) {
diskName := "disk"
nodeName := types.NodeName("instance")
readOnly := false
regional := true
spec := createPVSpec(diskName, readOnly, []string{"zone1", "zone2"})
// Successful Attach call
testcase := testcase{
name: "Attach_Regional_Positive",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {diskName}}, nil},
attach: attachCall{diskName, nodeName, readOnly, regional, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
devicePath, err := attacher.Attach(spec, nodeName)
expectedDevicePath := filepath.FromSlash("/dev/disk/by-id/google-disk")
if devicePath != expectedDevicePath {
return fmt.Errorf("devicePath incorrect. Expected<\"%s\"> Actual: <%q>", expectedDevicePath, devicePath)
}
return err
},
}
err := testcase.test(&testcase)
if err != testcase.expectedReturn {
t.Errorf("%s failed: expected err=%v, got %v", testcase.name, testcase.expectedReturn, err)
}
}
func TestAttachDetach(t *testing.T) {
diskName := "disk"
nodeName := types.NodeName("instance")
readOnly := false
regional := false
spec := createVolSpec(diskName, readOnly)
attachError := errors.New("fake attach error")
detachError := errors.New("fake detach error")
diskCheckError := errors.New("fake DiskIsAttached error")
attachTestFunc := func(testcase *testcase) error {
attacher := newAttacher(testcase)
devicePath, err := attacher.Attach(spec, nodeName)
expectedDevicePath := filepath.FromSlash("/dev/disk/by-id/google-disk")
if devicePath != expectedDevicePath {
return fmt.Errorf("devicePath incorrect. Expected<\"%s\"> Actual: <%q>", expectedDevicePath, devicePath)
}
return err
}
tests := []testcase{
// Successful Attach call
{
name: "Attach_Positive",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, nil},
attach: attachCall{diskName, nodeName, readOnly, regional, nil},
test: attachTestFunc,
},
// Disk is already attached
{
name: "Attach_Positive_AlreadyAttached",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {diskName}}, nil},
test: attachTestFunc,
},
// DiskIsAttached fails and Attach succeeds
{
name: "Attach_Positive_CheckFails",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, diskCheckError},
attach: attachCall{diskName, nodeName, readOnly, regional, nil},
test: attachTestFunc,
},
// Attach call fails
{
name: "Attach_Negative",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, diskCheckError},
attach: attachCall{diskName, nodeName, readOnly, regional, attachError},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
devicePath, err := attacher.Attach(spec, nodeName)
if devicePath != "" {
return fmt.Errorf("devicePath incorrect. Expected<\"\"> Actual: <%q>", devicePath)
}
return err
},
expectedReturn: attachError,
},
// Detach succeeds
{
name: "Detach_Positive",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {diskName}}, nil},
detach: detachCall{diskName, nodeName, nil},
test: func(testcase *testcase) error {
detacher := newDetacher(testcase)
return detacher.Detach(diskName, nodeName)
},
},
// Disk is already detached
{
name: "Detach_Positive_AlreadyDetached",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, nil},
test: func(testcase *testcase) error {
detacher := newDetacher(testcase)
return detacher.Detach(diskName, nodeName)
},
},
// Detach succeeds when DiskIsAttached fails
{
name: "Detach_Positive_CheckFails",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, diskCheckError},
detach: detachCall{diskName, nodeName, nil},
test: func(testcase *testcase) error {
detacher := newDetacher(testcase)
return detacher.Detach(diskName, nodeName)
},
},
// Detach fails
{
name: "Detach_Negative",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName: {}}, diskCheckError},
detach: detachCall{diskName, nodeName, detachError},
test: func(testcase *testcase) error {
detacher := newDetacher(testcase)
return detacher.Detach(diskName, nodeName)
},
expectedReturn: detachError,
},
}
for _, testcase := range tests {
testcase.t = t
err := testcase.test(&testcase)
if err != testcase.expectedReturn {
t.Errorf("%s failed: expected err=%v, got %v", testcase.name, testcase.expectedReturn, err)
}
}
}
func TestVerifyVolumesAttached(t *testing.T) {
readOnly := false
nodeName1 := types.NodeName("instance1")
nodeName2 := types.NodeName("instance2")
diskAName := "diskA"
diskBName := "diskB"
diskCName := "diskC"
diskASpec := createVolSpec(diskAName, readOnly)
diskBSpec := createVolSpec(diskBName, readOnly)
diskCSpec := createVolSpec(diskCName, readOnly)
verifyDiskAttachedInResult := func(results map[*volume.Spec]bool, spec *volume.Spec, expected bool) error {
found, ok := results[spec]
if !ok {
return fmt.Errorf("expected to find volume %s in verifcation result, but didn't", spec.Name())
}
if found != expected {
return fmt.Errorf("expected to find volume %s to be have attached value %v but got %v", spec.Name(), expected, found)
}
return nil
}
tests := []testcase{
// Successful VolumesAreAttached
{
name: "VolumesAreAttached_Positive",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName1: {diskAName, diskBName}}, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
results, err := attacher.VolumesAreAttached([]*volume.Spec{diskASpec, diskBSpec}, nodeName1)
if err != nil {
return err
}
err = verifyDiskAttachedInResult(results, diskASpec, true)
if err != nil {
return err
}
return verifyDiskAttachedInResult(results, diskBSpec, true)
},
},
// Successful VolumesAreAttached for detached disk
{
name: "VolumesAreAttached_Negative",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName1: {diskAName}}, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
results, err := attacher.VolumesAreAttached([]*volume.Spec{diskASpec, diskBSpec}, nodeName1)
if err != nil {
return err
}
err = verifyDiskAttachedInResult(results, diskASpec, true)
if err != nil {
return err
}
return verifyDiskAttachedInResult(results, diskBSpec, false)
},
},
// VolumesAreAttached with InstanceNotFound
{
name: "VolumesAreAttached_InstanceNotFound",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{}, nil},
expectedReturn: cloudprovider.InstanceNotFound,
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
_, err := attacher.VolumesAreAttached([]*volume.Spec{diskASpec}, nodeName1)
if err != cloudprovider.InstanceNotFound {
return fmt.Errorf("expected InstanceNotFound error, but got %v", err)
}
return err
},
},
// Successful BulkDisksAreAttached
{
name: "BulkDisksAreAttached_Positive",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName1: {diskAName}, nodeName2: {diskBName, diskCName}}, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
results, err := attacher.BulkVerifyVolumes(map[types.NodeName][]*volume.Spec{nodeName1: {diskASpec}, nodeName2: {diskBSpec, diskCSpec}})
if err != nil {
return err
}
disksAttachedNode1, nodeFound := results[nodeName1]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName1)
}
if err := verifyDiskAttachedInResult(disksAttachedNode1, diskASpec, true); err != nil {
return err
}
disksAttachedNode2, nodeFound := results[nodeName2]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName2)
}
if err := verifyDiskAttachedInResult(disksAttachedNode2, diskBSpec, true); err != nil {
return err
}
return verifyDiskAttachedInResult(disksAttachedNode2, diskCSpec, true)
},
},
// Successful BulkDisksAreAttached for detached disk
{
name: "BulkDisksAreAttached_Negative",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName1: {}, nodeName2: {diskBName}}, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
results, err := attacher.BulkVerifyVolumes(map[types.NodeName][]*volume.Spec{nodeName1: {diskASpec}, nodeName2: {diskBSpec, diskCSpec}})
if err != nil {
return err
}
disksAttachedNode1, nodeFound := results[nodeName1]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName1)
}
if err := verifyDiskAttachedInResult(disksAttachedNode1, diskASpec, false); err != nil {
return err
}
disksAttachedNode2, nodeFound := results[nodeName2]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName2)
}
if err := verifyDiskAttachedInResult(disksAttachedNode2, diskBSpec, true); err != nil {
return err
}
return verifyDiskAttachedInResult(disksAttachedNode2, diskCSpec, false)
},
},
// Successful BulkDisksAreAttached with InstanceNotFound
{
name: "BulkDisksAreAttached_InstanceNotFound",
diskIsAttached: diskIsAttachedCall{disksAttachedMap{nodeName1: {diskAName}}, nil},
test: func(testcase *testcase) error {
attacher := newAttacher(testcase)
results, err := attacher.BulkVerifyVolumes(map[types.NodeName][]*volume.Spec{nodeName1: {diskASpec}, nodeName2: {diskBSpec, diskCSpec}})
if err != nil {
return err
}
disksAttachedNode1, nodeFound := results[nodeName1]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName1)
}
if err := verifyDiskAttachedInResult(disksAttachedNode1, diskASpec, true); err != nil {
return err
}
disksAttachedNode2, nodeFound := results[nodeName2]
if !nodeFound {
return fmt.Errorf("expected to find node %s but didn't", nodeName2)
}
if err := verifyDiskAttachedInResult(disksAttachedNode2, diskBSpec, false); err != nil {
return err
}
return verifyDiskAttachedInResult(disksAttachedNode2, diskCSpec, false)
},
},
}
for _, testcase := range tests {
testcase.t = t
err := testcase.test(&testcase)
if err != testcase.expectedReturn {
t.Errorf("%s failed: expected err=%v, got %v", testcase.name, testcase.expectedReturn, err)
}
}
}
// newPlugin creates a new gcePersistentDiskPlugin with fake cloud, NewAttacher
// and NewDetacher won't work.
func newPlugin(t *testing.T) *gcePersistentDiskPlugin {
host := volumetest.NewFakeVolumeHost(t,
os.TempDir(), /* rootDir */
nil, /* kubeClient */
nil, /* plugins */
)
plugins := ProbeVolumePlugins()
plugin := plugins[0]
plugin.Init(host)
return plugin.(*gcePersistentDiskPlugin)
}
func newAttacher(testcase *testcase) *gcePersistentDiskAttacher {
return &gcePersistentDiskAttacher{
host: nil,
gceDisks: testcase,
}
}
func newDetacher(testcase *testcase) *gcePersistentDiskDetacher {
return &gcePersistentDiskDetacher{
gceDisks: testcase,
}
}
func createVolSpec(name string, readOnly bool) *volume.Spec {
return &volume.Spec{
Volume: &v1.Volume{
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: name,
ReadOnly: readOnly,
},
},
},
}
}
func createPVSpec(name string, readOnly bool, zones []string) *volume.Spec {
spec := &volume.Spec{
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: name,
ReadOnly: readOnly,
},
},
},
},
}
if zones != nil {
zonesLabel := strings.Join(zones, cloudvolume.LabelMultiZoneDelimiter)
spec.PersistentVolume.ObjectMeta.Labels = map[string]string{
v1.LabelTopologyZone: zonesLabel,
}
}
return spec
}
// Fake GCE implementation
type attachCall struct {
diskName string
nodeName types.NodeName
readOnly bool
regional bool
retErr error
}
type detachCall struct {
devicePath string
nodeName types.NodeName
retErr error
}
type diskIsAttachedCall struct {
attachedDisks disksAttachedMap
retErr error
}
// disksAttachedMap specifies what disks in the test scenario are actually attached to each node
type disksAttachedMap map[types.NodeName][]string
func (testcase *testcase) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool, regional bool) error {
expected := &testcase.attach
if expected.diskName == "" && expected.nodeName == "" {
// testcase.attach looks uninitialized, test did not expect to call AttachDisk
return errors.New("unexpected AttachDisk call")
}
if expected.diskName != diskName {
return fmt.Errorf("unexpected AttachDisk call: expected diskName %s, got %s", expected.diskName, diskName)
}
if expected.nodeName != nodeName {
return fmt.Errorf("unexpected AttachDisk call: expected nodeName %s, got %s", expected.nodeName, nodeName)
}
if expected.readOnly != readOnly {
return fmt.Errorf("unexpected AttachDisk call: expected readOnly %v, got %v", expected.readOnly, readOnly)
}
if expected.regional != regional {
return fmt.Errorf("unexpected AttachDisk call: expected regional %v, got %v", expected.regional, regional)
}
klog.V(4).Infof("AttachDisk call: %s, %s, %v, returning %v", diskName, nodeName, readOnly, expected.retErr)
return expected.retErr
}
func (testcase *testcase) DetachDisk(devicePath string, nodeName types.NodeName) error {
expected := &testcase.detach
if expected.devicePath == "" && expected.nodeName == "" {
// testcase.detach looks uninitialized, test did not expect to call DetachDisk
return errors.New("unexpected DetachDisk call")
}
if expected.devicePath != devicePath {
return fmt.Errorf("unexpected DetachDisk call: expected devicePath %s, got %s", expected.devicePath, devicePath)
}
if expected.nodeName != nodeName {
return fmt.Errorf("unexpected DetachDisk call: expected nodeName %s, got %s", expected.nodeName, nodeName)
}
klog.V(4).Infof("DetachDisk call: %s, %s, returning %v", devicePath, nodeName, expected.retErr)
return expected.retErr
}
func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) {
expected := &testcase.diskIsAttached
if expected.attachedDisks == nil {
// testcase.attachedDisks looks uninitialized, test did not expect to call DiskIsAttached
return false, errors.New("unexpected DiskIsAttached call")
}
if expected.retErr != nil {
return false, expected.retErr
}
disksForNode, nodeExists := expected.attachedDisks[nodeName]
if !nodeExists {
return false, cloudprovider.InstanceNotFound
}
found := false
for _, diskAttachedName := range disksForNode {
if diskAttachedName == diskName {
found = true
}
}
klog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v", diskName, nodeName, found)
return found, nil
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
verifiedDisks := make(map[string]bool)
for _, name := range diskNames {
found, err := testcase.DiskIsAttached(name, nodeName)
if err != nil {
return nil, err
}
verifiedDisks[name] = found
}
return verifiedDisks, nil
}
func (testcase *testcase) BulkDisksAreAttached(diskByNodes map[types.NodeName][]string) (map[types.NodeName]map[string]bool, error) {
verifiedDisksByNodes := make(map[types.NodeName]map[string]bool)
for nodeName, disksForNode := range diskByNodes {
verifiedDisks, err := testcase.DisksAreAttached(disksForNode, nodeName)
if err != nil {
if err != cloudprovider.InstanceNotFound {
return nil, err
}
verifiedDisks = make(map[string]bool)
for _, diskName := range disksForNode {
verifiedDisks[diskName] = false
}
}
verifiedDisksByNodes[nodeName] = verifiedDisks
}
return verifiedDisksByNodes, nil
}
func (testcase *testcase) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) (*gce.Disk, error) {
return nil, errors.New("not implemented")
}
func (testcase *testcase) CreateRegionalDisk(name string, diskType string, replicaZones sets.String, sizeGb int64, tags map[string]string) (*gce.Disk, error) {
return nil, errors.New("not implemented")
}
func (testcase *testcase) DeleteDisk(diskToDelete string) error {
return errors.New("not implemented")
}
func (testcase *testcase) GetAutoLabelsForPD(*gce.Disk) (map[string]string, error) {
return map[string]string{}, errors.New("not implemented")
}
func (testcase *testcase) ResizeDisk(
diskName string,
oldSize resource.Quantity,
newSize resource.Quantity) (resource.Quantity, error) {
return oldSize, errors.New("not implemented")
}

View File

@ -1,19 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gcepd contains the internal representation of GCE PersistentDisk
// volumes.
package gcepd // import "k8s.io/kubernetes/pkg/volume/gcepd"

View File

@ -1,568 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
gcecloud "k8s.io/legacy-cloud-providers/gce"
)
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&gcePersistentDiskPlugin{nil}}
}
type gcePersistentDiskPlugin struct {
host volume.VolumeHost
}
var _ volume.VolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.PersistentVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.DeletableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.ProvisionableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.ExpandableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.VolumePluginWithAttachLimits = &gcePersistentDiskPlugin{}
const (
gcePersistentDiskPluginName = "kubernetes.io/gce-pd"
)
// The constants are used to map from the machine type (number of CPUs) to the limit of
// persistent disks that can be attached to an instance. Please refer to gcloud doc
// https://cloud.google.com/compute/docs/machine-types
// These constants are all the documented attach limit minus one because the
// node boot disk is considered an attachable disk so effective attach limit is
// one less.
const (
volumeLimitSmall = 15
volumeLimitBig = 127
)
func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(gcePersistentDiskPluginName), volName)
}
func (plugin *gcePersistentDiskPlugin) Init(host volume.VolumeHost) error {
plugin.host = host
return nil
}
func (plugin *gcePersistentDiskPlugin) GetPluginName() string {
return gcePersistentDiskPluginName
}
func (plugin *gcePersistentDiskPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return volumeSource.PDName, nil
}
func (plugin *gcePersistentDiskPlugin) CanSupport(spec *volume.Spec) bool {
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.GCEPersistentDisk != nil) ||
(spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil)
}
func (plugin *gcePersistentDiskPlugin) RequiresRemount(spec *volume.Spec) bool {
return false
}
func (plugin *gcePersistentDiskPlugin) SupportsMountOption() bool {
return true
}
func (plugin *gcePersistentDiskPlugin) SupportsBulkVolumeVerification() bool {
return true
}
func (plugin *gcePersistentDiskPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
return false, nil
}
func (plugin *gcePersistentDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
v1.ReadOnlyMany,
}
}
func (plugin *gcePersistentDiskPlugin) GetVolumeLimits() (map[string]int64, error) {
volumeLimits := map[string]int64{
util.GCEVolumeLimitKey: volumeLimitSmall,
}
cloud := plugin.host.GetCloudProvider()
// if we can't fetch cloudprovider we return an error
// hoping external CCM or admin can set it. Returning
// default values from here will mean, no one can
// override them.
if cloud == nil {
return nil, fmt.Errorf("no cloudprovider present")
}
if cloud.ProviderName() != gcecloud.ProviderName {
return nil, fmt.Errorf("expected gce cloud got %s", cloud.ProviderName())
}
instances, ok := cloud.Instances()
if !ok {
klog.Warning("Failed to get instances from cloud provider")
return volumeLimits, nil
}
instanceType, err := instances.InstanceType(context.TODO(), plugin.host.GetNodeName())
if err != nil {
klog.Errorf("Failed to get instance type from GCE cloud provider")
return volumeLimits, nil
}
smallMachineTypes := []string{"f1-micro", "g1-small", "e2-micro", "e2-small", "e2-medium"}
for _, small := range smallMachineTypes {
if instanceType == small {
volumeLimits[util.GCEVolumeLimitKey] = volumeLimitSmall
return volumeLimits, nil
}
}
volumeLimits[util.GCEVolumeLimitKey] = volumeLimitBig
return volumeLimits, nil
}
func (plugin *gcePersistentDiskPlugin) VolumeLimitKey(spec *volume.Spec) string {
return util.GCEVolumeLimitKey
}
func (plugin *gcePersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
// Inject real implementations here, test through the internal function.
return plugin.newMounterInternal(spec, pod.UID, &GCEDiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func getVolumeSource(
spec *volume.Spec) (*v1.GCEPersistentDiskVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.GCEPersistentDisk != nil {
return spec.Volume.GCEPersistentDisk, spec.Volume.GCEPersistentDisk.ReadOnly, nil
} else if spec.PersistentVolume != nil &&
spec.PersistentVolume.Spec.GCEPersistentDisk != nil {
return spec.PersistentVolume.Spec.GCEPersistentDisk, spec.ReadOnly, nil
}
return nil, false, fmt.Errorf("spec does not reference a GCE volume type")
}
func (plugin *gcePersistentDiskPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Mounter, error) {
// GCEPDs used directly in a pod have a ReadOnly flag set by the pod author.
// GCEPDs used as a PersistentVolume gets the ReadOnly flag indirectly through the persistent-claim volume used to mount the PV
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
pdName := volumeSource.PDName
partition := ""
if volumeSource.Partition != 0 {
partition = strconv.Itoa(int(volumeSource.Partition))
}
return &gcePersistentDiskMounter{
gcePersistentDisk: &gcePersistentDisk{
podUID: podUID,
volName: spec.Name(),
pdName: pdName,
partition: partition,
mounter: mounter,
manager: manager,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
},
mountOptions: util.MountOptionFromSpec(spec),
readOnly: readOnly}, nil
}
func (plugin *gcePersistentDiskPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
// Inject real implementations here, test through the internal function.
return plugin.newUnmounterInternal(volName, podUID, &GCEDiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *gcePersistentDiskPlugin) newUnmounterInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Unmounter, error) {
return &gcePersistentDiskUnmounter{&gcePersistentDisk{
podUID: podUID,
volName: volName,
manager: manager,
mounter: mounter,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
}}, nil
}
func (plugin *gcePersistentDiskPlugin) NewDeleter(logger klog.Logger, spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &GCEDiskUtil{})
}
func (plugin *gcePersistentDiskPlugin) newDeleterInternal(spec *volume.Spec, manager pdManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.GCEPersistentDisk == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.GCEPersistentDisk is nil")
}
return &gcePersistentDiskDeleter{
gcePersistentDisk: &gcePersistentDisk{
volName: spec.Name(),
pdName: spec.PersistentVolume.Spec.GCEPersistentDisk.PDName,
manager: manager,
plugin: plugin,
}}, nil
}
func (plugin *gcePersistentDiskPlugin) NewProvisioner(logger klog.Logger, options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &GCEDiskUtil{})
}
func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.VolumeOptions, manager pdManager) (volume.Provisioner, error) {
return &gcePersistentDiskProvisioner{
gcePersistentDisk: &gcePersistentDisk{
manager: manager,
plugin: plugin,
},
options: options,
}, nil
}
func (plugin *gcePersistentDiskPlugin) RequiresFSResize() bool {
return true
}
func (plugin *gcePersistentDiskPlugin) ExpandVolumeDevice(
spec *volume.Spec,
newSize resource.Quantity,
oldSize resource.Quantity) (resource.Quantity, error) {
cloud, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return oldSize, err
}
pdName := spec.PersistentVolume.Spec.GCEPersistentDisk.PDName
updatedQuantity, err := cloud.ResizeDisk(pdName, oldSize, newSize)
if err != nil {
return oldSize, err
}
return updatedQuantity, nil
}
func (plugin *gcePersistentDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
if err != nil {
return false, fmt.Errorf("error checking VolumeMode: %v", err)
}
// if volume is not a fs file system, there is nothing for us to do here.
if !fsVolume {
return true, nil
}
_, err = util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}
var _ volume.NodeExpandableVolumePlugin = &gcePersistentDiskPlugin{}
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return volume.ReconstructedVolume{}, err
}
gceVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: sourceName,
},
},
}
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(gceVolume),
}, nil
}
// Abstract interface to PD operations.
type pdManager interface {
// Creates a volume
CreateVolume(provisioner *gcePersistentDiskProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, labels map[string]string, fstype string, err error)
// Deletes a volume
DeleteVolume(deleter *gcePersistentDiskDeleter) error
}
// gcePersistentDisk volumes are disk resources provided by Google Compute Engine
// that are attached to the kubelet's host machine and exposed to the pod.
type gcePersistentDisk struct {
volName string
podUID types.UID
// Unique identifier of the PD, used to find the disk resource in the provider.
pdName string
// Specifies the partition to mount
partition string
// Utility interface to provision and delete disks
manager pdManager
// Mounter interface that provides system calls to mount the global path to the pod local path.
mounter mount.Interface
plugin *gcePersistentDiskPlugin
volume.MetricsProvider
}
type gcePersistentDiskMounter struct {
*gcePersistentDisk
// Specifies whether the disk will be mounted as read-only.
readOnly bool
mountOptions []string
}
var _ volume.Mounter = &gcePersistentDiskMounter{}
func (b *gcePersistentDiskMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: b.readOnly,
Managed: !b.readOnly,
SELinuxRelabel: true,
}
}
// SetUp bind mounts the disk global mount to the volume path.
func (b *gcePersistentDiskMounter) SetUp(mounterArgs volume.MounterArgs) error {
return b.SetUpAt(b.GetPath(), mounterArgs)
}
// SetUp bind mounts the disk global mount to the give volume path.
func (b *gcePersistentDiskMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
// TODO: handle failed mounts here.
notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
klog.V(4).Infof("GCE PersistentDisk set up: Dir (%s) PD name (%q) Mounted (%t) Error (%v), ReadOnly (%t)", dir, b.pdName, !notMnt, err, b.readOnly)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot validate mount point: %s %v", dir, err)
}
if !notMnt {
return nil
}
if runtime.GOOS != "windows" {
// in windows, we will use mklink to mount, will MkdirAll in Mount func
if err := os.MkdirAll(dir, 0750); err != nil {
return fmt.Errorf("mkdir failed on disk %s (%v)", dir, err)
}
}
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
options := []string{"bind"}
if b.readOnly {
options = append(options, "ro")
}
globalPDPath := makeGlobalPDName(b.plugin.host, b.pdName)
klog.V(4).Infof("attempting to mount %s", dir)
mountOptions := util.JoinMountOptions(b.mountOptions, options)
err = b.mounter.MountSensitiveWithoutSystemd(globalPDPath, dir, "", mountOptions, nil)
if err != nil {
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
return fmt.Errorf("failed to mount: %v. Cleanup IsLikelyNotMountPoint check failed: %v", err, mntErr)
}
if !notMnt {
if mntErr = b.mounter.Unmount(dir); mntErr != nil {
return fmt.Errorf("failed to mount: %v. Cleanup failed to unmount: %v", err, mntErr)
}
notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
return fmt.Errorf("failed to mount: %v. Cleanup IsLikelyNotMountPoint check failed: %v", err, mntErr)
}
if !notMnt {
// This is very odd, we don't expect it. We'll try again next sync loop.
return fmt.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop", dir)
}
}
mntErr = os.Remove(dir)
if mntErr != nil {
return fmt.Errorf("failed to mount: %v. Cleanup os Remove(%s) failed: %v", err, dir, mntErr)
}
return fmt.Errorf("mount of disk %s failed: %v", dir, err)
}
klog.V(4).Infof("mount of disk %s succeeded", dir)
if !b.readOnly {
if err := volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil)); err != nil {
klog.Errorf("SetVolumeOwnership returns error %v", err)
}
}
return nil
}
func makeGlobalPDName(host volume.VolumeHost, devName string) string {
return filepath.Join(host.GetPluginDir(gcePersistentDiskPluginName), util.MountsInGlobalPDPath, devName)
}
func (b *gcePersistentDiskMounter) GetPath() string {
return getPath(b.podUID, b.volName, b.plugin.host)
}
type gcePersistentDiskUnmounter struct {
*gcePersistentDisk
}
var _ volume.Unmounter = &gcePersistentDiskUnmounter{}
func (c *gcePersistentDiskUnmounter) GetPath() string {
return getPath(c.podUID, c.volName, c.plugin.host)
}
// Unmounts the bind mount, and detaches the disk only if the PD
// resource was the last reference to that disk on the kubelet.
func (c *gcePersistentDiskUnmounter) TearDown() error {
return c.TearDownAt(c.GetPath())
}
// TearDownAt unmounts the bind mount
func (c *gcePersistentDiskUnmounter) TearDownAt(dir string) error {
return mount.CleanupMountPoint(dir, c.mounter, false)
}
type gcePersistentDiskDeleter struct {
*gcePersistentDisk
}
var _ volume.Deleter = &gcePersistentDiskDeleter{}
func (d *gcePersistentDiskDeleter) GetPath() string {
return getPath(d.podUID, d.volName, d.plugin.host)
}
func (d *gcePersistentDiskDeleter) Delete() error {
return d.manager.DeleteVolume(d)
}
type gcePersistentDiskProvisioner struct {
*gcePersistentDisk
options volume.VolumeOptions
}
var _ volume.Provisioner = &gcePersistentDiskProvisioner{}
func (c *gcePersistentDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
if !util.ContainsAllAccessModes(c.plugin.GetAccessModes(), c.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
}
volumeID, sizeGB, labels, fstype, err := c.manager.CreateVolume(c, selectedNode, allowedTopologies)
if err != nil {
return nil, err
}
if fstype == "" {
fstype = "ext4"
}
volumeMode := c.options.PVC.Spec.VolumeMode
if volumeMode != nil && *volumeMode == v1.PersistentVolumeBlock {
// Block volumes should not have any FSType
fstype = ""
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: c.options.PVName,
Labels: map[string]string{},
Annotations: map[string]string{
util.VolumeDynamicallyCreatedByKey: "gce-pd-dynamic-provisioner",
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
AccessModes: c.options.PVC.Spec.AccessModes,
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
},
VolumeMode: volumeMode,
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: volumeID,
Partition: 0,
ReadOnly: false,
FSType: fstype,
},
},
MountOptions: c.options.MountOptions,
},
}
if len(c.options.PVC.Spec.AccessModes) == 0 {
pv.Spec.AccessModes = c.plugin.GetAccessModes()
}
requirements := make([]v1.NodeSelectorRequirement, 0)
if len(labels) != 0 {
if pv.Labels == nil {
pv.Labels = make(map[string]string)
}
for k, v := range labels {
pv.Labels[k] = v
var values []string
if k == v1.LabelTopologyZone {
values, err = volumehelpers.LabelZonesToList(v)
if err != nil {
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a List: %v", v, err)
}
} else {
values = []string{v}
}
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
}
}
if len(requirements) > 0 {
pv.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
pv.Spec.NodeAffinity.Required = new(v1.NodeSelector)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions = requirements
}
return pv, nil
}

View File

@ -1,183 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"fmt"
"path/filepath"
"strconv"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
var _ volume.VolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.PersistentVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.BlockVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.DeletableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.ProvisionableVolumePlugin = &gcePersistentDiskPlugin{}
var _ volume.ExpandableVolumePlugin = &gcePersistentDiskPlugin{}
func (plugin *gcePersistentDiskPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {
pluginDir := plugin.host.GetVolumeDevicePluginDir(gcePersistentDiskPluginName)
blkutil := volumepathhandler.NewBlockVolumePathHandler()
globalMapPathUUID, err := blkutil.FindGlobalMapPathUUIDFromPod(pluginDir, mapPath, podUID)
if err != nil {
return nil, err
}
klog.V(5).Infof("globalMapPathUUID: %v, err: %v", globalMapPathUUID, err)
globalMapPath := filepath.Dir(globalMapPathUUID)
if len(globalMapPath) <= 1 {
return nil, fmt.Errorf("failed to get volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
}
return getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath)
}
func getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath string) (*volume.Spec, error) {
// Get volume spec information from globalMapPath
// globalMapPath example:
// plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumeID}
// plugins/kubernetes.io/gce-pd/volumeDevices/vol-XXXXXX
pdName := filepath.Base(globalMapPath)
if len(pdName) <= 1 {
return nil, fmt.Errorf("failed to get pd name from global path=%s", globalMapPath)
}
block := v1.PersistentVolumeBlock
gceVolume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: pdName,
},
},
VolumeMode: &block,
},
}
return volume.NewSpecFromPersistentVolume(gceVolume, true), nil
}
// NewBlockVolumeMapper creates a new volume.BlockVolumeMapper from an API specification.
func (plugin *gcePersistentDiskPlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
// If this is called via GenerateUnmapDeviceFunc(), pod is nil.
// Pass empty string as dummy uid since uid isn't used in the case.
var uid types.UID
if pod != nil {
uid = pod.UID
}
return plugin.newBlockVolumeMapperInternal(spec, uid, &GCEDiskUtil{}, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *gcePersistentDiskPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.BlockVolumeMapper, error) {
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
pdName := volumeSource.PDName
partition := ""
if volumeSource.Partition != 0 {
partition = strconv.Itoa(int(volumeSource.Partition))
}
mapper := &gcePersistentDiskMapper{
gcePersistentDisk: &gcePersistentDisk{
volName: spec.Name(),
podUID: podUID,
pdName: pdName,
partition: partition,
manager: manager,
mounter: mounter,
plugin: plugin,
},
readOnly: readOnly,
}
blockPath, err := mapper.GetGlobalMapPath(spec)
if err != nil {
return nil, fmt.Errorf("failed to get device path: %v", err)
}
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
return mapper, nil
}
func (plugin *gcePersistentDiskPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
return plugin.newUnmapperInternal(volName, podUID, &GCEDiskUtil{})
}
func (plugin *gcePersistentDiskPlugin) newUnmapperInternal(volName string, podUID types.UID, manager pdManager) (volume.BlockVolumeUnmapper, error) {
return &gcePersistentDiskUnmapper{
gcePersistentDisk: &gcePersistentDisk{
volName: volName,
podUID: podUID,
pdName: volName,
manager: manager,
plugin: plugin,
}}, nil
}
type gcePersistentDiskUnmapper struct {
*gcePersistentDisk
}
var _ volume.BlockVolumeUnmapper = &gcePersistentDiskUnmapper{}
type gcePersistentDiskMapper struct {
*gcePersistentDisk
readOnly bool
}
var _ volume.BlockVolumeMapper = &gcePersistentDiskMapper{}
// GetGlobalMapPath returns global map path and error
// path: plugins/kubernetes.io/{PluginName}/volumeDevices/pdName
func (pd *gcePersistentDisk) GetGlobalMapPath(spec *volume.Spec) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
return "", err
}
return filepath.Join(pd.plugin.host.GetVolumeDevicePluginDir(gcePersistentDiskPluginName), string(volumeSource.PDName)), nil
}
// GetPodDeviceMapPath returns pod device map path and volume name
// path: pods/{podUid}/volumeDevices/kubernetes.io~aws
func (pd *gcePersistentDisk) GetPodDeviceMapPath() (string, string) {
name := gcePersistentDiskPluginName
return pd.plugin.host.GetPodVolumeDeviceDir(pd.podUID, utilstrings.EscapeQualifiedName(name)), pd.volName
}
// SupportsMetrics returns true for gcePersistentDisk as it initializes the
// MetricsProvider.
func (pd *gcePersistentDisk) SupportsMetrics() bool {
return true
}

View File

@ -1,151 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"os"
"path/filepath"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
const (
testPdName = "pdVol1"
testPVName = "pv1"
testGlobalPath = "plugins/kubernetes.io/gce-pd/volumeDevices/pdVol1"
testPodPath = "pods/poduid/volumeDevices/kubernetes.io~gce-pd"
)
func TestGetVolumeSpecFromGlobalMapPath(t *testing.T) {
// make our test path for fake GlobalMapPath
// /tmp symbolized our pluginDir
// /tmp/testGlobalPathXXXXX/plugins/kubernetes.io/gce-pd/volumeDevices/pdVol1
tmpVDir, err := utiltesting.MkTmpdir("gceBlockTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
//deferred clean up
defer os.RemoveAll(tmpVDir)
expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)
//Bad Path
badspec, err := getVolumeSpecFromGlobalMapPath("", "")
if badspec != nil || err == nil {
t.Errorf("Expected not to get spec from GlobalMapPath but did")
}
// Good Path
spec, err := getVolumeSpecFromGlobalMapPath("myVolume", expectedGlobalPath)
if spec == nil || err != nil {
t.Fatalf("Failed to get spec from GlobalMapPath: %v", err)
}
if spec.PersistentVolume.Name != "myVolume" {
t.Errorf("Invalid PV name from GlobalMapPath spec: %s", spec.PersistentVolume.Name)
}
if spec.PersistentVolume.Spec.GCEPersistentDisk.PDName != testPdName {
t.Errorf("Invalid pdName from GlobalMapPath spec: %s", spec.PersistentVolume.Spec.GCEPersistentDisk.PDName)
}
block := v1.PersistentVolumeBlock
specMode := spec.PersistentVolume.Spec.VolumeMode
if specMode == nil {
t.Fatalf("Failed to get volumeMode from PersistentVolumeBlock: %v", block)
}
if *specMode != block {
t.Errorf("Invalid volumeMode from GlobalMapPath spec: %v expected: %v", *specMode, block)
}
}
func getTestVolume(readOnly bool, path string, isBlock bool) *volume.Spec {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: testPVName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: testPdName,
},
},
},
}
if isBlock {
blockMode := v1.PersistentVolumeBlock
pv.Spec.VolumeMode = &blockMode
}
return volume.NewSpecFromPersistentVolume(pv, readOnly)
}
func TestGetPodAndPluginMapPaths(t *testing.T) {
tmpVDir, err := utiltesting.MkTmpdir("gceBlockTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
//deferred clean up
defer os.RemoveAll(tmpVDir)
expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)
expectedPodPath := filepath.Join(tmpVDir, testPodPath)
spec := getTestVolume(false, tmpVDir, true /*isBlock*/)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(t, tmpVDir, nil, nil))
plug, err := plugMgr.FindMapperPluginByName(gcePersistentDiskPluginName)
if err != nil {
os.RemoveAll(tmpVDir)
t.Fatalf("Can't find the plugin by name: %q", gcePersistentDiskPluginName)
}
if plug.GetPluginName() != gcePersistentDiskPluginName {
t.Fatalf("Wrong name: %s", plug.GetPluginName())
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
mapper, err := plug.NewBlockVolumeMapper(spec, pod, volume.VolumeOptions{})
if err != nil {
t.Fatalf("Failed to make a new Mounter: %v", err)
}
if mapper == nil {
t.Fatalf("Got a nil Mounter")
}
//GetGlobalMapPath
gMapPath, err := mapper.GetGlobalMapPath(spec)
if err != nil || len(gMapPath) == 0 {
t.Fatalf("Invalid GlobalMapPath from spec: %s", spec.PersistentVolume.Spec.GCEPersistentDisk.PDName)
}
if gMapPath != expectedGlobalPath {
t.Errorf("Failed to get GlobalMapPath: %s %s", gMapPath, expectedGlobalPath)
}
//GetPodDeviceMapPath
gDevicePath, gVolName := mapper.GetPodDeviceMapPath()
if gDevicePath != expectedPodPath {
t.Errorf("Got unexpected pod path: %s, expected %s", gDevicePath, expectedPodPath)
}
if gVolName != testPVName {
t.Errorf("Got unexpected volNamne: %s, expected %s", gVolName, testPVName)
}
}

View File

@ -1,372 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"fmt"
"os"
"path/filepath"
"reflect"
goruntime "runtime"
"sort"
"testing"
"k8s.io/mount-utils"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
func TestCanSupport(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil {
t.Fatal("Can't find the plugin by name")
}
if plug.GetPluginName() != "kubernetes.io/gce-pd" {
t.Errorf("Wrong name: %s", plug.GetPluginName())
}
if !plug.CanSupport(&volume.Spec{Volume: &v1.Volume{VolumeSource: v1.VolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{}}}}) {
t.Errorf("Expected true")
}
if !plug.CanSupport(&volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{}}}}}) {
t.Errorf("Expected true")
}
}
func TestGetAccessModes(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/gce-pd")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
if !volumetest.ContainsAccessMode(plug.GetAccessModes(), v1.ReadWriteOnce) || !volumetest.ContainsAccessMode(plug.GetAccessModes(), v1.ReadOnlyMany) {
t.Errorf("Expected two AccessModeTypes: %s and %s", v1.ReadWriteOnce, v1.ReadOnlyMany)
}
}
type fakePDManager struct {
}
func (fake *fakePDManager) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (volumeID string, volumeSizeGB int, labels map[string]string, fstype string, err error) {
labels = make(map[string]string)
labels["fakepdmanager"] = "yes"
labels[v1.LabelTopologyZone] = "zone1__zone2"
return "test-gce-volume-name", 100, labels, "", nil
}
func (fake *fakePDManager) DeleteVolume(cd *gcePersistentDiskDeleter) error {
if cd.pdName != "test-gce-volume-name" {
return fmt.Errorf("Deleter got unexpected volume name: %s", cd.pdName)
}
return nil
}
func getNodeSelectorRequirementWithKey(key string, term v1.NodeSelectorTerm) (*v1.NodeSelectorRequirement, error) {
for _, r := range term.MatchExpressions {
if r.Key == key {
return &r, nil
}
}
return nil, fmt.Errorf("key %s not found", key)
}
func TestPlugin(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
spec := &v1.Volume{
Name: "vol1",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "pd",
FSType: "ext4",
},
},
}
fakeManager := &fakePDManager{}
fakeMounter := mount.NewFakeMounter(nil)
mounter, err := plug.(*gcePersistentDiskPlugin).newMounterInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
volPath := filepath.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~gce-pd/vol1")
path := mounter.GetPath()
if path != volPath {
t.Errorf("Got unexpected path: %s", path)
}
if err := mounter.SetUp(volume.MounterArgs{}); err != nil {
t.Errorf("Expected success, got: %v", err)
}
// On Windows, Mount will create the parent of dir and mklink (create a symbolic link) at the volume path later,
// so mounter.SetUp will not create the directory. Otherwise mklink will error: "Cannot create a file when that file already exists".
if goruntime.GOOS != "windows" {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
}
fakeManager = &fakePDManager{}
unmounter, err := plug.(*gcePersistentDiskPlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Errorf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", path)
} else if !os.IsNotExist(err) {
t.Errorf("TearDown() failed: %v", err)
}
// Test Provisioner
options := volume.VolumeOptions{
PVC: volumetest.CreateTestPVC("100Mi", []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}),
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
}
provisioner, err := plug.(*gcePersistentDiskPlugin).newProvisionerInternal(options, &fakePDManager{})
if err != nil {
t.Errorf("Error creating new provisioner:%v", err)
}
persistentSpec, err := provisioner.Provision(nil, nil)
if err != nil {
t.Errorf("Provision() failed: %v", err)
}
if persistentSpec.Spec.PersistentVolumeSource.GCEPersistentDisk.PDName != "test-gce-volume-name" {
t.Errorf("Provision() returned unexpected volume ID: %s", persistentSpec.Spec.PersistentVolumeSource.GCEPersistentDisk.PDName)
}
cap := persistentSpec.Spec.Capacity[v1.ResourceStorage]
size := cap.Value()
if size != 100*volumehelpers.GiB {
t.Errorf("Provision() returned unexpected volume size: %v", size)
}
if persistentSpec.Labels["fakepdmanager"] != "yes" {
t.Errorf("Provision() returned unexpected value for fakepdmanager: %v", persistentSpec.Labels["fakepdmanager"])
}
if persistentSpec.Labels[v1.LabelTopologyZone] != "zone1__zone2" {
t.Errorf("Provision() returned unexpected value for %s: %v", v1.LabelTopologyZone, persistentSpec.Labels[v1.LabelTopologyZone])
}
if persistentSpec.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if len(persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms) != 1 {
t.Errorf("Unexpected number of NodeSelectorTerms")
}
term := persistentSpec.Spec.NodeAffinity.Required.NodeSelectorTerms[0]
if len(term.MatchExpressions) != 2 {
t.Errorf("Unexpected number of NodeSelectorRequirements in volume NodeAffinity: %d", len(term.MatchExpressions))
}
r, _ := getNodeSelectorRequirementWithKey("fakepdmanager", term)
if r == nil || r.Values[0] != "yes" || r.Operator != v1.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement fakepdmanager-in-yes not found in volume NodeAffinity")
}
zones, _ := volumehelpers.ZonesToSet("zone1,zone2")
r, _ = getNodeSelectorRequirementWithKey(v1.LabelTopologyZone, term)
if r == nil {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", v1.LabelTopologyZone, zones)
} else {
sort.Strings(r.Values)
if !reflect.DeepEqual(r.Values, zones.List()) {
t.Errorf("ZoneFailureDomain elements %v does not match zone labels %v", r.Values, zones)
}
}
// Test Deleter
volSpec := &volume.Spec{
PersistentVolume: persistentSpec,
}
deleter, err := plug.(*gcePersistentDiskPlugin).newDeleterInternal(volSpec, &fakePDManager{})
if err != nil {
t.Errorf("Error creating new deleter:%v", err)
}
err = deleter.Delete()
if err != nil {
t.Errorf("Deleter() failed: %v", err)
}
}
func TestMountOptions(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil {
t.Errorf("Can't find the plugin by name")
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pvA",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{},
},
ClaimRef: &v1.ObjectReference{
Name: "claimA",
},
MountOptions: []string{"_netdev"},
},
}
fakeManager := &fakePDManager{}
fakeMounter := mount.NewFakeMounter(nil)
mounter, err := plug.(*gcePersistentDiskPlugin).newMounterInternal(volume.NewSpecFromPersistentVolume(pv, false), types.UID("poduid"), fakeManager, fakeMounter)
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Errorf("Got a nil Mounter")
}
if err := mounter.SetUp(volume.MounterArgs{}); err != nil {
t.Errorf("Expected success, got: %v", err)
}
mountOptions := fakeMounter.MountPoints[0].Opts
expectedMountOptions := []string{"_netdev", "bind"}
if !reflect.DeepEqual(mountOptions, expectedMountOptions) {
t.Errorf("Expected mount options to be %v got %v", expectedMountOptions, mountOptions)
}
}
func TestPersistentClaimReadOnlyFlag(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pvA",
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{},
},
ClaimRef: &v1.ObjectReference{
Name: "claimA",
},
},
}
claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claimA",
Namespace: "nsA",
},
Spec: v1.PersistentVolumeClaimSpec{
VolumeName: "pvA",
},
Status: v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
},
}
client := fake.NewSimpleClientset(pv, claim)
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, client, nil))
plug, _ := plugMgr.FindPluginByName(gcePersistentDiskPluginName)
// readOnly bool is supplied by persistent-claim volume source when its mounter creates other volumes
spec := volume.NewSpecFromPersistentVolume(pv, true)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
mounter, _ := plug.NewMounter(spec, pod, volume.VolumeOptions{})
if mounter == nil {
t.Fatalf("Got a nil Mounter")
}
if !mounter.GetAttributes().ReadOnly {
t.Errorf("Expected true for mounter.IsReadOnly")
}
}
func TestUnsupportedVolumeHost(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("gcepdTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(t, tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName("kubernetes.io/gce-pd")
if err != nil {
t.Fatal("Can't find the plugin by name")
}
_, err = plug.ConstructVolumeSpec("", "")
if err == nil {
t.Errorf("Expected failure constructing volume spec with unsupported VolumeHost")
}
}

View File

@ -1,368 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import (
"fmt"
"path/filepath"
"regexp"
"strings"
"time"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
"k8s.io/utils/exec"
utilpath "k8s.io/utils/path"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
cloudvolume "k8s.io/cloud-provider/volume"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
gcecloud "k8s.io/legacy-cloud-providers/gce"
)
const (
diskByIDPath = "/dev/disk/by-id/"
diskGooglePrefix = "google-"
diskScsiGooglePrefix = "scsi-0Google_PersistentDisk_"
diskPartitionSuffix = "-part"
diskSDPath = "/dev/sd"
diskSDPattern = "/dev/sd*"
maxRetries = 10
checkSleepDuration = time.Second
maxRegionalPDZones = 2
// Replication type constants must be lower case.
replicationTypeNone = "none"
replicationTypeRegionalPD = "regional-pd"
// scsi_id output should be in the form of:
// 0Google PersistentDisk <disk name>
scsiPattern = `^0Google\s+PersistentDisk\s+([\S]+)\s*$`
)
var (
// errorSleepDuration is modified only in unit tests and should be constant
// otherwise.
errorSleepDuration = 5 * time.Second
// regex to parse scsi_id output and extract the serial
scsiRegex = regexp.MustCompile(scsiPattern)
)
// GCEDiskUtil provides operation for GCE PD
type GCEDiskUtil struct{}
// DeleteVolume deletes a GCE PD
// Returns: error
func (util *GCEDiskUtil) DeleteVolume(d *gcePersistentDiskDeleter) error {
cloud, err := getCloudProvider(d.gcePersistentDisk.plugin.host.GetCloudProvider())
if err != nil {
return err
}
if err = cloud.DeleteDisk(d.pdName); err != nil {
klog.V(2).Infof("Error deleting GCE PD volume %s: %v", d.pdName, err)
// GCE cloud provider returns volume.deletedVolumeInUseError when
// necessary, no handling needed here.
return err
}
klog.V(2).Infof("Successfully deleted GCE PD volume %s", d.pdName)
return nil
}
// CreateVolume creates a GCE PD.
// Returns: gcePDName, volumeSizeGB, labels, fsType, error
func (util *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (string, int, map[string]string, string, error) {
cloud, err := getCloudProvider(c.gcePersistentDisk.plugin.host.GetCloudProvider())
if err != nil {
return "", 0, nil, "", err
}
name := volumeutil.GenerateVolumeName(c.options.ClusterName, c.options.PVName, 63) // GCE PD name can have up to 63 characters
capacity := c.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// GCE PDs are allocated in chunks of GiBs
requestGB, err := volumehelpers.RoundUpToGiB(capacity)
if err != nil {
return "", 0, nil, "", err
}
// Apply Parameters.
// Values for parameter "replication-type" are canonicalized to lower case.
// Values for other parameters are case-insensitive, and we leave validation of these values
// to the cloud provider.
diskType := ""
configuredZone := ""
var configuredZones sets.String
zonePresent := false
zonesPresent := false
replicationType := replicationTypeNone
fstype := ""
for k, v := range c.options.Parameters {
switch strings.ToLower(k) {
case "type":
diskType = v
case "zone":
zonePresent = true
configuredZone = v
case "zones":
zonesPresent = true
configuredZones, err = volumehelpers.ZonesToSet(v)
if err != nil {
return "", 0, nil, "", err
}
case "replication-type":
replicationType = strings.ToLower(v)
case volume.VolumeParameterFSType:
fstype = v
default:
return "", 0, nil, "", fmt.Errorf("invalid option %q for volume plugin %s", k, c.plugin.GetPluginName())
}
}
// TODO: implement PVC.Selector parsing
if c.options.PVC.Spec.Selector != nil {
return "", 0, nil, "", fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on GCE")
}
var activezones sets.String
activezones, err = cloud.GetAllCurrentZones()
if err != nil {
return "", 0, nil, "", err
}
var disk *gcecloud.Disk
switch replicationType {
case replicationTypeRegionalPD:
selectedZones, err := volumehelpers.SelectZonesForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name, maxRegionalPDZones)
if err != nil {
klog.V(2).Infof("Error selecting zones for regional GCE PD volume: %v", err)
return "", 0, nil, "", err
}
disk, err = cloud.CreateRegionalDisk(
name,
diskType,
selectedZones,
requestGB,
*c.options.CloudTags)
if err != nil {
klog.V(2).Infof("Error creating regional GCE PD volume: %v", err)
return "", 0, nil, "", err
}
klog.V(2).Infof("Successfully created Regional GCE PD volume %s", name)
case replicationTypeNone:
selectedZone, err := volumehelpers.SelectZoneForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name)
if err != nil {
return "", 0, nil, "", err
}
disk, err = cloud.CreateDisk(
name,
diskType,
selectedZone,
requestGB,
*c.options.CloudTags)
if err != nil {
klog.V(2).Infof("Error creating single-zone GCE PD volume: %v", err)
return "", 0, nil, "", err
}
klog.V(2).Infof("Successfully created single-zone GCE PD volume %s", name)
default:
return "", 0, nil, "", fmt.Errorf("replication-type of '%s' is not supported", replicationType)
}
labels, err := cloud.GetAutoLabelsForPD(disk)
if err != nil {
// We don't really want to leak the volume here...
klog.Errorf("error getting labels for volume %q: %v", name, err)
}
return name, int(requestGB), labels, fstype, nil
}
// Returns the first path that exists, or empty string if none exist.
func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String, diskName string) (string, error) {
if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil {
// It's possible udevadm was called on other disks so it should not block this
// call. If it did fail on this disk, then the devicePath will either
// not exist or be wrong. If it's wrong, then the scsi_id check below will fail.
klog.Errorf("udevadmChangeToNewDrives failed with: %v", err)
}
for _, path := range devicePaths {
if pathExists, err := mount.PathExists(path); err != nil {
return "", fmt.Errorf("error checking if path exists: %v", err)
} else if pathExists {
// validate that the path actually resolves to the correct disk
serial, err := getScsiSerial(path, diskName)
if err != nil {
return "", fmt.Errorf("failed to get scsi serial %v", err)
}
if serial != diskName {
// The device link is not pointing to the correct device
// Trigger udev on this device to try to fix the link
if udevErr := udevadmChangeToDrive(path); udevErr != nil {
klog.Errorf("udevadmChangeToDrive %q failed with: %v", path, err)
}
// Return error to retry WaitForAttach and verifyDevicePath
return "", fmt.Errorf("scsi_id serial %q for device %q doesn't match disk %q", serial, path, diskName)
}
// The device link is correct
return path, nil
}
}
return "", nil
}
// Calls scsi_id on the given devicePath to get the serial number reported by that device.
func getScsiSerial(devicePath, diskName string) (string, error) {
exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, "/lib/udev/scsi_id")
if err != nil {
return "", fmt.Errorf("failed to check scsi_id existence: %v", err)
}
if !exists {
klog.V(6).Infof("scsi_id doesn't exist; skipping check for %v", devicePath)
return diskName, nil
}
out, err := exec.New().Command(
"/lib/udev/scsi_id",
"--page=0x83",
"--whitelisted",
fmt.Sprintf("--device=%v", devicePath)).CombinedOutput()
if err != nil {
return "", fmt.Errorf("scsi_id failed for device %q with %v", devicePath, err)
}
return parseScsiSerial(string(out))
}
// Parse the output returned by scsi_id and extract the serial number
func parseScsiSerial(output string) (string, error) {
substrings := scsiRegex.FindStringSubmatch(output)
if substrings == nil {
return "", fmt.Errorf("scsi_id output cannot be parsed: %q", output)
}
return substrings[1], nil
}
// Returns list of all /dev/disk/by-id/* paths for given PD.
func getDiskByIDPaths(pdName string, partition string) []string {
devicePaths := []string{
filepath.Join(diskByIDPath, diskGooglePrefix+pdName),
filepath.Join(diskByIDPath, diskScsiGooglePrefix+pdName),
}
if partition != "" {
for i, path := range devicePaths {
devicePaths[i] = path + diskPartitionSuffix + partition
}
}
return devicePaths
}
// Return cloud provider
func getCloudProvider(cloudProvider cloudprovider.Interface) (*gcecloud.Cloud, error) {
var err error
for numRetries := 0; numRetries < maxRetries; numRetries++ {
gceCloudProvider, ok := cloudProvider.(*gcecloud.Cloud)
if !ok || gceCloudProvider == nil {
// Retry on error. See issue #11321
klog.Errorf("Failed to get GCE Cloud Provider. plugin.host.GetCloudProvider returned %v instead", cloudProvider)
time.Sleep(errorSleepDuration)
continue
}
return gceCloudProvider, nil
}
return nil, fmt.Errorf("failed to get GCE GCECloudProvider with error %v", err)
}
// Triggers the application of udev rules by calling "udevadm trigger
// --action=change" for newly created "/dev/sd*" drives (exist only in
// after set). This is workaround for Issue #7972. Once the underlying
// issue has been resolved, this may be removed.
func udevadmChangeToNewDrives(sdBeforeSet sets.String) error {
sdAfter, err := filepath.Glob(diskSDPattern)
if err != nil {
return fmt.Errorf("error filepath.Glob(\"%s\"): %v\r", diskSDPattern, err)
}
for _, sd := range sdAfter {
if !sdBeforeSet.Has(sd) {
return udevadmChangeToDrive(sd)
}
}
return nil
}
// Calls "udevadm trigger --action=change" on the specified drive.
// drivePath must be the block device path to trigger on, in the format "/dev/sd*", or a symlink to it.
// This is workaround for Issue #7972. Once the underlying issue has been resolved, this may be removed.
func udevadmChangeToDrive(drivePath string) error {
klog.V(5).Infof("udevadmChangeToDrive: drive=%q", drivePath)
// Evaluate symlink, if any
drive, err := filepath.EvalSymlinks(drivePath)
if err != nil {
return fmt.Errorf("udevadmChangeToDrive: filepath.EvalSymlinks(%q) failed with %v", drivePath, err)
}
klog.V(5).Infof("udevadmChangeToDrive: symlink path is %q", drive)
// Check to make sure input is "/dev/sd*"
if !strings.Contains(drive, diskSDPath) {
return fmt.Errorf("udevadmChangeToDrive: expected input in the form \"%s\" but drive is %q", diskSDPattern, drive)
}
// Call "udevadm trigger --action=change --property-match=DEVNAME=/dev/sd..."
_, err = exec.New().Command(
"udevadm",
"trigger",
"--action=change",
fmt.Sprintf("--property-match=DEVNAME=%s", drive)).CombinedOutput()
if err != nil {
return fmt.Errorf("udevadmChangeToDrive: udevadm trigger failed for drive %q with %v", drive, err)
}
return nil
}
// Checks whether the given GCE PD volume spec is associated with a regional PD.
func isRegionalPD(spec *volume.Spec) bool {
if spec.PersistentVolume != nil {
zonesLabel := spec.PersistentVolume.Labels[v1.LabelTopologyZone]
if zonesLabel == "" {
zonesLabel = spec.PersistentVolume.Labels[v1.LabelFailureDomainBetaZone]
}
zones := strings.Split(zonesLabel, cloudvolume.LabelMultiZoneDelimiter)
return len(zones) > 1
}
return false
}

View File

@ -1,65 +0,0 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gcepd
import "testing"
func TestParseScsiSerial(t *testing.T) {
cases := []struct {
name string
output string
diskName string
expectErr bool
}{
{
name: "valid",
output: "0Google PersistentDisk test-disk",
diskName: "test-disk",
},
{
name: "valid with newline",
output: "0Google PersistentDisk test-disk\n",
diskName: "test-disk",
},
{
name: "invalid prefix",
output: "00Google PersistentDisk test-disk",
expectErr: true,
},
{
name: "invalid suffix",
output: "0Google PersistentDisk test-disk more",
expectErr: true,
},
}
for _, test := range cases {
serial, err := parseScsiSerial(test.output)
if err != nil && !test.expectErr {
t.Errorf("test %v failed: %v", test.name, err)
}
if err == nil && test.expectErr {
t.Errorf("test %q failed: got success", test.name)
}
if serial != test.diskName {
t.Errorf("test %v failed: expected serial %q, got %q", test.name, test.diskName, serial)
}
}
}

View File

@ -36,10 +36,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
"k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/volume"
csitesting "k8s.io/kubernetes/pkg/volume/csi/testing"
"k8s.io/kubernetes/pkg/volume/gcepd"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
@ -58,12 +56,12 @@ func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) {
testcases := []testcase{
{
name: "gce pd plugin: csi migration disabled",
pluginName: plugins.GCEPDInTreePluginName,
pluginName: "fake-plugin",
pvSpec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{},
}},
probVolumePlugins: gcepd.ProbeVolumePlugins(),
probVolumePlugins: volumetesting.ProbeVolumePlugins(volume.VolumeConfig{}),
},
}