mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Add initial support for Volumes to AWS
This commit is contained in:
parent
d2b6920a32
commit
edf0292d4a
@ -17,10 +17,13 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
goruntime "runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/test/e2e"
|
||||
"github.com/golang/glog"
|
||||
@ -47,6 +50,7 @@ func init() {
|
||||
flag.StringVar(&context.Host, "host", "", "The host, or apiserver, to connect to")
|
||||
flag.StringVar(&context.RepoRoot, "repo_root", "./", "Root directory of kubernetes repository, for finding test files. Default assumes working directory is repository root")
|
||||
flag.StringVar(&context.Provider, "provider", "", "The name of the Kubernetes provider (gce, gke, local, vagrant, etc.)")
|
||||
|
||||
flag.StringVar(&gceConfig.MasterName, "kube_master", "", "Name of the kubernetes master. Only required if provider is gce or gke")
|
||||
flag.StringVar(&gceConfig.ProjectID, "gce_project", "", "The GCE project being used, if applicable")
|
||||
flag.StringVar(&gceConfig.Zone, "gce_zone", "", "GCE zone being used, if applicable")
|
||||
@ -63,5 +67,23 @@ func main() {
|
||||
glog.Error("Invalid --times (negative or no testing requested)!")
|
||||
os.Exit(1)
|
||||
}
|
||||
cloudConfig := &e2e.CloudConfig{
|
||||
ProjectID: *gceProject,
|
||||
Zone: *gceZone,
|
||||
MasterName: *masterName,
|
||||
}
|
||||
|
||||
if *provider == "aws" {
|
||||
awsConfig := "[Global]\n"
|
||||
awsConfig += fmt.Sprintf("Region=%s\n", *gceZone)
|
||||
|
||||
var err error
|
||||
cloudConfig.Provider, err = cloudprovider.GetCloudProvider(*provider, strings.NewReader(awsConfig))
|
||||
if err != nil {
|
||||
glog.Error("Error building AWS provider: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
e2e.RunE2ETests(context, *orderseed, *times, *reportDir, testList)
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network/exec"
|
||||
// Volume plugins
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/aws_pd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/empty_dir"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/gce_pd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/git_repo"
|
||||
@ -49,6 +50,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||
// The list of plugins to probe is decided by the kubelet binary, not
|
||||
// by dynamic linking or other "magic". Plugins will be analyzed and
|
||||
// initialized later.
|
||||
allPlugins = append(allPlugins, aws_pd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
|
||||
|
@ -189,6 +189,9 @@ type VolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"`
|
||||
// AWSPersistentDisk represents a AWS EBS disk that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo"`
|
||||
// Secret represents a secret that should populate this volume.
|
||||
@ -208,6 +211,9 @@ type PersistentVolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"`
|
||||
// AWSPersistentDisk represents a AWS EBS disk that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way
|
||||
@ -384,11 +390,31 @@ type ISCSIVolumeSource struct {
|
||||
IQN string `json:"iqn,omitempty"`
|
||||
// Required: iSCSI target lun number
|
||||
Lun int `json:"lun,omitempty"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
}
|
||||
|
||||
// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS EBS disk must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS EBS disk can only be mounted as read/write once.
|
||||
type AWSPersistentDiskVolumeSource struct {
|
||||
// Unique name of the PD resource. Used to identify the disk in AWS
|
||||
PDName string `json:"pdName"`
|
||||
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
FSType string `json:"fsType,omitempty"`
|
||||
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field is 0 or empty.
|
||||
Partition int `json:"partition,omitempty"`
|
||||
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
|
@ -1170,6 +1170,9 @@ func init() {
|
||||
if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1194,7 +1197,11 @@ func init() {
|
||||
if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
<<<<<<< HEAD
|
||||
if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil {
|
||||
=======
|
||||
if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil {
|
||||
>>>>>>> Add initial support for Volumes to AWS
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostDir, &out.HostPath, 0); err != nil {
|
||||
|
@ -88,7 +88,7 @@ type Volume struct {
|
||||
// Source represents the location and type of a volume to mount.
|
||||
// This is optional for now. If not specified, the Volume is implied to be an EmptyDir.
|
||||
// This implied behavior is deprecated and will be removed in a future version.
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
}
|
||||
|
||||
// VolumeSource represents the source location of a volume to mount.
|
||||
@ -105,6 +105,9 @@ type VolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// AWSPersistentDisk represents a AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret represents a secret to populate the volume with
|
||||
@ -124,6 +127,9 @@ type PersistentVolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSPersistentDisk represents a AWS EBS volume that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
@ -302,6 +308,29 @@ type ISCSIVolumeSource struct {
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSPersistentDiskVolumeSource struct {
|
||||
// Unique name of the PD resource. Used to identify the disk in AWS
|
||||
PDName string `json:"pdName" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
@ -1097,6 +1097,9 @@ func init() {
|
||||
if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ type Volume struct {
|
||||
// Source represents the location and type of a volume to mount.
|
||||
// This is optional for now. If not specified, the Volume is implied to be an EmptyDir.
|
||||
// This implied behavior is deprecated and will be removed in a future version.
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
}
|
||||
|
||||
// VolumeSource represents the source location of a volume to mount.
|
||||
@ -74,6 +74,9 @@ type VolumeSource struct {
|
||||
// A persistent disk that is mounted to the
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// An AWS persistent disk that is mounted to the
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret is a secret to populate the volume with
|
||||
@ -93,6 +96,9 @@ type PersistentVolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSPersistentDisk represents a AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
@ -284,6 +290,29 @@ type GCEPersistentDiskVolumeSource struct {
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSPersistentDiskVolumeSource struct {
|
||||
// Unique name of the PD resource. Used to identify the disk in AWS
|
||||
PDName string `json:"pdName" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
@ -206,6 +206,9 @@ type VolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// AWSPersistentDisk represents a AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret represents a secret that should populate this volume.
|
||||
@ -225,6 +228,9 @@ type PersistentVolumeSource struct {
|
||||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSPersistentDisk represents a AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
@ -400,6 +406,29 @@ type GCEPersistentDiskVolumeSource struct {
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSPersistentDiskVolumeSource struct {
|
||||
// Unique name of the PD resource. Used to identify the disk in AWS
|
||||
PDName string `json:"pdName" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
@ -299,6 +299,10 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(source.GCEPersistentDisk).Prefix("persistentDisk")...)
|
||||
}
|
||||
if source.AWSPersistentDisk != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateAWSPersistentDiskVolumeSource(source.AWSPersistentDisk).Prefix("awsPersistentDisk")...)
|
||||
}
|
||||
if source.Secret != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateSecretVolumeSource(source.Secret).Prefix("secret")...)
|
||||
@ -368,6 +372,20 @@ func validateGCEPersistentDiskVolumeSource(PD *api.GCEPersistentDiskVolumeSource
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func validateAWSPersistentDiskVolumeSource(PD *api.AWSPersistentDiskVolumeSource) errs.ValidationErrorList {
|
||||
allErrs := errs.ValidationErrorList{}
|
||||
if PD.PDName == "" {
|
||||
allErrs = append(allErrs, errs.NewFieldRequired("pdName"))
|
||||
}
|
||||
if PD.FSType == "" {
|
||||
allErrs = append(allErrs, errs.NewFieldRequired("fsType"))
|
||||
}
|
||||
if PD.Partition < 0 || PD.Partition > 255 {
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("partition", PD.Partition, pdPartitionErrorMsg))
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.ValidationErrorList {
|
||||
allErrs := errs.ValidationErrorList{}
|
||||
if secretSource.SecretName == "" {
|
||||
@ -426,6 +444,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(pv.Spec.GCEPersistentDisk).Prefix("persistentDisk")...)
|
||||
}
|
||||
if pv.Spec.AWSPersistentDisk != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateAWSPersistentDiskVolumeSource(pv.Spec.AWSPersistentDisk).Prefix("awsPersistentDisk")...)
|
||||
}
|
||||
if numVolumes != 1 {
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required"))
|
||||
}
|
||||
@ -1021,6 +1043,7 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ValidationErrorL
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("GCEPersistentDisk.ReadOnly", false, "ReadOnly must be true for replicated pods > 1, as GCE PD can only be mounted on multiple machines if it is read-only."))
|
||||
}
|
||||
}
|
||||
// TODO: What to do for AWS? It doesn't support replicas
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
@ -516,6 +516,7 @@ func TestValidateVolumes(t *testing.T) {
|
||||
{Name: "abc-123", VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{"/mnt/path3"}}},
|
||||
{Name: "empty", VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
|
||||
{Name: "gcepd", VolumeSource: api.VolumeSource{GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{"my-PD", "ext4", 1, false}}},
|
||||
{Name: "awspd", VolumeSource: api.VolumeSource{AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{"my-PD", "ext4", 1, false}}},
|
||||
{Name: "gitrepo", VolumeSource: api.VolumeSource{GitRepo: &api.GitRepoVolumeSource{"my-repo", "hashstring"}}},
|
||||
{Name: "iscsidisk", VolumeSource: api.VolumeSource{ISCSI: &api.ISCSIVolumeSource{"127.0.0.1", "iqn.2015-02.example.com:test", 1, "ext4", false}}},
|
||||
{Name: "secret", VolumeSource: api.VolumeSource{Secret: &api.SecretVolumeSource{"my-secret"}}},
|
||||
|
@ -17,11 +17,15 @@ limitations under the License.
|
||||
package aws_cloud
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/gcfg"
|
||||
"github.com/mitchellh/goamz/aws"
|
||||
@ -38,6 +42,19 @@ import (
|
||||
type EC2 interface {
|
||||
// Query EC2 for instances matching the filter
|
||||
Instances(instIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error)
|
||||
|
||||
|
||||
// Attach a volume to an instance
|
||||
AttachVolume(volumeId string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error)
|
||||
// Detach a volume from whatever instance it is attached to
|
||||
// TODO: We should specify the InstanceID and the Device, for safety
|
||||
DetachVolume(volumeId string) (resp *ec2.SimpleResp, err error)
|
||||
// Lists volumes
|
||||
Volumes(volumeIds []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error)
|
||||
// Create an EBS volume
|
||||
CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error)
|
||||
// Delete an EBS volume
|
||||
DeleteVolume(volumeId string) (resp *ec2.SimpleResp, err error)
|
||||
}
|
||||
|
||||
// Abstraction over the AWS metadata service
|
||||
@ -46,12 +63,34 @@ type AWSMetadata interface {
|
||||
GetMetaData(key string) ([]byte, error)
|
||||
}
|
||||
|
||||
type VolumeOptions struct {
|
||||
CapacityMB int
|
||||
Zone string
|
||||
}
|
||||
|
||||
// Volumes is an interface for managing cloud-provisioned volumes
|
||||
type Volumes interface {
|
||||
// Attach the disk to the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
AttachDisk(instanceName string, volumeName string, readOnly bool) error
|
||||
// Detach the disk from the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
DetachDisk(instanceName string, volumeName string) error
|
||||
|
||||
// Create a volume with the specified options
|
||||
CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
DeleteVolume(volumeName string) error
|
||||
}
|
||||
|
||||
// AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services.
|
||||
type AWSCloud struct {
|
||||
ec2 EC2
|
||||
cfg *AWSCloudConfig
|
||||
availabilityZone string
|
||||
region aws.Region
|
||||
|
||||
// The AWS instance that we are running on
|
||||
selfAwsInstance *awsInstance
|
||||
}
|
||||
|
||||
type AWSCloudConfig struct {
|
||||
@ -76,12 +115,12 @@ func (f *ec2InstanceFilter) Matches(instance ec2.Instance) bool {
|
||||
}
|
||||
|
||||
// goamzEC2 is an implementation of the EC2 interface, backed by goamz
|
||||
type GoamzEC2 struct {
|
||||
type goamzEC2 struct {
|
||||
ec2 *ec2.EC2
|
||||
}
|
||||
|
||||
// Implementation of EC2.Instances
|
||||
func (self *GoamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
|
||||
func (self *goamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
|
||||
var goamzFilter *ec2.Filter
|
||||
if filter != nil {
|
||||
goamzFilter = ec2.NewFilter()
|
||||
@ -106,6 +145,26 @@ func (self *goamzMetadata) GetMetaData(key string) ([]byte, error) {
|
||||
|
||||
type AuthFunc func() (auth aws.Auth, err error)
|
||||
|
||||
func (s *goamzEC2) AttachVolume(volumeId string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) {
|
||||
return s.ec2.AttachVolume(volumeId, instanceId, device)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) DetachVolume(volumeId string) (resp *ec2.SimpleResp, err error) {
|
||||
return s.ec2.DetachVolume(volumeId)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) Volumes(volumeIds []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) {
|
||||
return s.ec2.Volumes(volumeIds, filter)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) {
|
||||
return s.ec2.CreateVolume(request)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) DeleteVolume(volumeId string) (resp *ec2.SimpleResp, err error) {
|
||||
return s.ec2.DeleteVolume(volumeId)
|
||||
}
|
||||
|
||||
func init() {
|
||||
cloudprovider.RegisterCloudProvider("aws", func(config io.Reader) (cloudprovider.Interface, error) {
|
||||
metadata := &goamzMetadata{}
|
||||
@ -179,12 +238,23 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW
|
||||
return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone)
|
||||
}
|
||||
|
||||
return &AWSCloud{
|
||||
ec2: &GoamzEC2{ec2: ec2.New(auth, region)},
|
||||
cfg: cfg,
|
||||
ec2 := &goamzEC2{ec2: ec2.New(auth, region)}
|
||||
|
||||
instanceId, err := ec2.GetMetaData("instance-id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error fetching instance-id from ec2 metadata service: %v", err)
|
||||
}
|
||||
|
||||
awsCloud := &AWSCloud{
|
||||
ec2: ec2,
|
||||
cfg: cfg,
|
||||
region: region,
|
||||
availabilityZone: zone,
|
||||
}, nil
|
||||
}
|
||||
|
||||
awsCloud.selfAwsInstance = newAwsInstance(ec2, string(instanceId))
|
||||
|
||||
return awsCloud, nil
|
||||
}
|
||||
|
||||
func (aws *AWSCloud) Clusters() (cloudprovider.Clusters, bool) {
|
||||
@ -377,7 +447,7 @@ func getResourcesByInstanceType(instanceType string) (*api.NodeResources, error)
|
||||
return makeNodeResources("t1", 0.125, 0.615)
|
||||
|
||||
// t2: Burstable
|
||||
// TODO: The ECUs are fake values (because they are burstable), so this is just a guess...
|
||||
// TODO: The ECUs are fake values (because they are burstable), so this is just a guess...
|
||||
case "t2.micro":
|
||||
return makeNodeResources("t2", 0.25, 1)
|
||||
case "t2.small":
|
||||
@ -506,3 +576,345 @@ func (self *AWSCloud) GetZone() (cloudprovider.Zone, error) {
|
||||
Region: self.region.Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Abstraction around AWS Instance Types
|
||||
// There isn't an API to get information for a particular instance type (that I know of)
|
||||
type awsInstanceType struct {
|
||||
}
|
||||
|
||||
// TODO: Also return number of mounts allowed?
|
||||
func (self *awsInstanceType) getEbsMountDevices() []string {
|
||||
// See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html
|
||||
devices := []string{}
|
||||
for c := 'f'; c <= 'p'; c++ {
|
||||
devices = append(devices, fmt.Sprintf("/dev/sd%s", c))
|
||||
}
|
||||
return devices
|
||||
}
|
||||
|
||||
type awsInstance struct {
|
||||
ec2 EC2
|
||||
|
||||
// id in AWS
|
||||
awsId string
|
||||
|
||||
mutex sync.Mutex
|
||||
|
||||
// We must cache because otherwise there is a race condition,
|
||||
// where we assign a device mapping and then get a second request before we attach the volume
|
||||
deviceMappings map[string]string
|
||||
}
|
||||
|
||||
func newAwsInstance(ec2 EC2, awsId string) *awsInstance {
|
||||
self := &awsInstance{ec2: ec2, awsId: awsId}
|
||||
|
||||
// We lazy-init deviceMappings
|
||||
self.deviceMappings = nil
|
||||
|
||||
return self
|
||||
}
|
||||
|
||||
// Gets the awsInstanceType that models the instance type of this instance
|
||||
func (self *awsInstance) getInstanceType() *awsInstanceType {
|
||||
// TODO: Make this real
|
||||
awsInstanceType := &awsInstanceType{}
|
||||
return awsInstanceType
|
||||
}
|
||||
|
||||
// Gets the full information about this instance from the EC2 API
|
||||
func (self *awsInstance) getInfo() (*ec2.Instance, error) {
|
||||
resp, err := self.ec2.Instances([]string{self.awsId}, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error querying ec2 for instance info: %v", err)
|
||||
}
|
||||
if len(resp.Reservations) == 0 {
|
||||
return nil, fmt.Errorf("no reservations found for instance: %s", self.awsId)
|
||||
}
|
||||
if len(resp.Reservations) > 1 {
|
||||
return nil, fmt.Errorf("multiple reservations found for instance: %s", self.awsId)
|
||||
}
|
||||
if len(resp.Reservations[0].Instances) == 0 {
|
||||
return nil, fmt.Errorf("no instances found for instance: %s", self.awsId)
|
||||
}
|
||||
if len(resp.Reservations[0].Instances) > 1 {
|
||||
return nil, fmt.Errorf("multiple instances found for instance: %s", self.awsId)
|
||||
}
|
||||
return &resp.Reservations[0].Instances[0], nil
|
||||
}
|
||||
|
||||
func (self *awsInstance) assignMountDevice(volumeId string) (string, error) {
|
||||
instanceType := self.getInstanceType()
|
||||
if instanceType == nil {
|
||||
return "", fmt.Errorf("could not get instance type for instance: %s", self.awsId)
|
||||
}
|
||||
|
||||
// We lock to prevent concurrent mounts from conflicting
|
||||
// We may still conflict if someone calls the API concurrently,
|
||||
// but the AWS API will then fail one of the two attach operations
|
||||
self.mutex.Lock()
|
||||
defer self.mutex.Unlock()
|
||||
|
||||
// We cache both for efficiency and correctness
|
||||
if self.deviceMappings == nil {
|
||||
info, err := self.getInfo()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
deviceMappings := map[string]string{}
|
||||
for _, blockDevice := range info.BlockDevices {
|
||||
deviceMappings[blockDevice.DeviceName] = blockDevice.VolumeId
|
||||
}
|
||||
self.deviceMappings = deviceMappings
|
||||
}
|
||||
|
||||
// Check to see if this volume is already assigned a device on this machine
|
||||
for deviceName, mappingVolumeId := range self.deviceMappings {
|
||||
if volumeId == mappingVolumeId {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", deviceName, mappingVolumeId)
|
||||
return deviceName, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check all the valid mountpoints to see if any of them are free
|
||||
valid := instanceType.getEbsMountDevices()
|
||||
chosen := ""
|
||||
for _, device := range valid {
|
||||
_, found := self.deviceMappings[device]
|
||||
if !found {
|
||||
chosen = device
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if chosen == "" {
|
||||
glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", self.deviceMappings, valid)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
self.deviceMappings[chosen] = volumeId
|
||||
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeId)
|
||||
|
||||
return chosen, nil
|
||||
}
|
||||
|
||||
func (self *awsInstance) releaseMountDevice(volumeId string, mountDevice string) {
|
||||
self.mutex.Lock()
|
||||
defer self.mutex.Unlock()
|
||||
|
||||
existingVolumeId, found := self.deviceMappings[mountDevice]
|
||||
if !found {
|
||||
glog.Errorf("releaseMountDevice on non-allocated device")
|
||||
return
|
||||
}
|
||||
if volumeId != existingVolumeId {
|
||||
glog.Errorf("releaseMountDevice on device assigned to different volume")
|
||||
return
|
||||
}
|
||||
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeId)
|
||||
delete(self.deviceMappings, mountDevice)
|
||||
}
|
||||
|
||||
type awsDisk struct {
|
||||
ec2 EC2
|
||||
|
||||
// Name in k8s
|
||||
name string
|
||||
// id in AWS
|
||||
awsId string
|
||||
// az which holds the volume
|
||||
az string
|
||||
}
|
||||
|
||||
func newAwsDisk(ec2 EC2, name string) (*awsDisk, error) {
|
||||
// name looks like aws://availability-zone/id
|
||||
url, err := url.Parse(name)
|
||||
if err != nil {
|
||||
// TODO: Maybe we should pass a URL into the Volume functions
|
||||
return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err)
|
||||
}
|
||||
if url.Scheme != "aws" {
|
||||
return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
|
||||
}
|
||||
awsId := url.Path
|
||||
// TODO: Regex match?
|
||||
if strings.Contains(awsId, "/") || !strings.HasPrefix(awsId, "vol-") {
|
||||
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
|
||||
}
|
||||
az := url.Host
|
||||
// TODO: Better validation?
|
||||
// TODO: Default to our AZ? Look it up?
|
||||
// TODO: Should this be a region or an AZ?
|
||||
if az == "" {
|
||||
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
|
||||
}
|
||||
disk := &awsDisk{ec2: ec2, name: name, awsId: awsId, az: az}
|
||||
return disk, nil
|
||||
}
|
||||
|
||||
// Gets the full information about this volume from the EC2 API
|
||||
func (self *awsDisk) getInfo() (*ec2.Volume, error) {
|
||||
resp, err := self.ec2.Volumes([]string{self.awsId}, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error querying ec2 for volume info: %v", err)
|
||||
}
|
||||
if len(resp.Volumes) == 0 {
|
||||
return nil, fmt.Errorf("no volumes found for volume: %s", self.awsId)
|
||||
}
|
||||
if len(resp.Volumes) > 1 {
|
||||
return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsId)
|
||||
}
|
||||
return &resp.Volumes[0], nil
|
||||
}
|
||||
|
||||
func (self *awsDisk) waitForAttachmentStatus(status string) error {
|
||||
attempt := 0
|
||||
maxAttempts := 60
|
||||
|
||||
for {
|
||||
info, err := self.getInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(info.Attachments) > 1 {
|
||||
glog.Warningf("Found multiple attachments for volume: %v", info)
|
||||
}
|
||||
attachmentStatus := ""
|
||||
for _, attachment := range info.Attachments {
|
||||
if attachment.Status == status {
|
||||
return nil
|
||||
}
|
||||
attachmentStatus = attachment.Status
|
||||
}
|
||||
glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||
|
||||
attempt++
|
||||
if attempt > maxAttempts {
|
||||
glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||
return errors.New("Timeout waiting for volume state")
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Deletes the EBS disk
|
||||
func (self *awsDisk) delete() error {
|
||||
_, err := self.ec2.DeleteVolume(self.awsId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error delete EBS volumes: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Gets the awsInstance for the EC2 instance on which we are running
|
||||
// may return nil in case of error
|
||||
func (aws *AWSCloud) getSelfAwsInstance() *awsInstance {
|
||||
// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance
|
||||
return aws.selfAwsInstance
|
||||
}
|
||||
|
||||
// Implements Volumes.AttachDisk
|
||||
func (aws *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly bool) error {
|
||||
disk, err := newAwsDisk(aws.ec2, diskName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var awsInstance *awsInstance
|
||||
if instanceName == "" {
|
||||
awsInstance = aws.selfAwsInstance
|
||||
} else {
|
||||
instance, err := aws.getInstancesByDnsName(instanceName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error finding instance: %v", err)
|
||||
}
|
||||
|
||||
awsInstance = newAwsInstance(aws.ec2, instance.InstanceId)
|
||||
}
|
||||
|
||||
if readOnly {
|
||||
// TODO: We could enforce this when we mount the volume (?)
|
||||
// TODO: We could also snapshot the volume and attach copies of it
|
||||
return errors.New("AWS volumes cannot be mounted read-only")
|
||||
}
|
||||
|
||||
mountDevice, err := awsInstance.assignMountDevice(disk.awsId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
attached := false
|
||||
defer func() {
|
||||
if !attached {
|
||||
awsInstance.releaseMountDevice(disk.awsId, mountDevice)
|
||||
}
|
||||
}()
|
||||
|
||||
attachResponse, err := aws.ec2.AttachVolume(disk.awsId, awsInstance.awsId, mountDevice)
|
||||
if err != nil {
|
||||
// TODO: Check if already attached?
|
||||
return fmt.Errorf("Error attaching EBS volume: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Info("AttachVolume request returned %v", attachResponse)
|
||||
|
||||
err = disk.waitForAttachmentStatus("attached")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
attached = true
|
||||
|
||||
// TODO: Return device name (and note that it might look like /dev/xvdf, not /dev/sdf)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DetachDisk
|
||||
func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
disk, err := newAwsDisk(aws.ec2, diskName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: We should specify the InstanceID and the Device, for safety
|
||||
response, err := aws.ec2.DetachVolume(disk.awsId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
}
|
||||
if response == nil {
|
||||
return errors.New("no response from DetachVolume")
|
||||
}
|
||||
err = disk.waitForAttachmentStatus("detached")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Implements Volumes.CreateVolume
|
||||
func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
request := &ec2.CreateVolume{}
|
||||
request.AvailZone = volumeOptions.Zone
|
||||
request.Size = (int64(volumeOptions.CapacityMB) + 1023) / 1024
|
||||
response, err := aws.ec2.CreateVolume(request)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
az := response.AvailZone
|
||||
awsId := response.VolumeId
|
||||
|
||||
volumeName := "aws://" + az + "/" + awsId
|
||||
|
||||
return volumeName, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DeleteVolume
|
||||
func (aws *AWSCloud) DeleteVolume(volumeName string) error {
|
||||
awsDisk, err := newAwsDisk(aws.ec2, volumeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return awsDisk.delete()
|
||||
}
|
||||
|
@ -50,16 +50,26 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
|
||||
}
|
||||
|
||||
func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
|
||||
if volume.GCEPersistentDisk == nil {
|
||||
return false
|
||||
}
|
||||
pdName := volume.GCEPersistentDisk.PDName
|
||||
if volume.GCEPersistentDisk != nil {
|
||||
pdName := volume.GCEPersistentDisk.PDName
|
||||
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].GCEPersistentDisk != nil &&
|
||||
manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName {
|
||||
return true
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].GCEPersistentDisk != nil &&
|
||||
manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
if volume.AWSPersistentDisk != nil {
|
||||
pdName := volume.AWSPersistentDisk.PDName
|
||||
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].AWSPersistentDisk != nil &&
|
||||
manifest.Volumes[ix].AWSPersistentDisk.PDName == pdName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
@ -321,6 +321,55 @@ func TestDiskConflicts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAWSDiskConflicts(t *testing.T) {
|
||||
volState := api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{
|
||||
PDName: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
volState2 := api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{
|
||||
PDName: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
pod api.Pod
|
||||
existingPods []api.Pod
|
||||
isOk bool
|
||||
test string
|
||||
}{
|
||||
{api.Pod{}, []api.Pod{}, true, "nothing"},
|
||||
{api.Pod{}, []api.Pod{{Spec: volState}}, true, "one state"},
|
||||
{api.Pod{Spec: volState}, []api.Pod{{Spec: volState}}, false, "same state"},
|
||||
{api.Pod{Spec: volState2}, []api.Pod{{Spec: volState}}, true, "different state"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ok, err := NoDiskConflict(test.pod, test.existingPods, "machine")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if test.isOk && !ok {
|
||||
t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test)
|
||||
}
|
||||
if !test.isOk && ok {
|
||||
t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodFitsSelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
pod api.Pod
|
||||
|
287
pkg/volume/aws_pd/aws_pd.go
Normal file
287
pkg/volume/aws_pd/aws_pd.go
Normal file
@ -0,0 +1,287 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_pd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// This is the primary entrypoint for volume plugins.
|
||||
func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||
return []volume.VolumePlugin{&awsPersistentDiskPlugin{nil}}
|
||||
}
|
||||
|
||||
type awsPersistentDiskPlugin struct {
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
var _ volume.VolumePlugin = &awsPersistentDiskPlugin{}
|
||||
|
||||
const (
|
||||
awsPersistentDiskPluginName = "kubernetes.io/aws-pd"
|
||||
)
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) Init(host volume.VolumeHost) {
|
||||
plugin.host = host
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) Name() string {
|
||||
return awsPersistentDiskPluginName
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) CanSupport(spec *api.Volume) bool {
|
||||
if spec.AWSPersistentDisk != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) GetAccessModes() []api.AccessModeType {
|
||||
return []api.AccessModeType{
|
||||
api.ReadWriteOnce,
|
||||
}
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
|
||||
// Inject real implementations here, test through the internal function.
|
||||
return plugin.newBuilderInternal(spec, podRef.UID, &AWSDiskUtil{}, mount.New())
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) newBuilderInternal(spec *api.Volume, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Builder, error) {
|
||||
pdName := spec.AWSPersistentDisk.PDName
|
||||
fsType := spec.AWSPersistentDisk.FSType
|
||||
partition := ""
|
||||
if spec.AWSPersistentDisk.Partition != 0 {
|
||||
partition = strconv.Itoa(spec.AWSPersistentDisk.Partition)
|
||||
}
|
||||
readOnly := spec.AWSPersistentDisk.ReadOnly
|
||||
|
||||
return &awsPersistentDisk{
|
||||
podUID: podUID,
|
||||
volName: spec.Name,
|
||||
pdName: pdName,
|
||||
fsType: fsType,
|
||||
partition: partition,
|
||||
readOnly: readOnly,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()},
|
||||
plugin: plugin,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
|
||||
// Inject real implementations here, test through the internal function.
|
||||
return plugin.newCleanerInternal(volName, podUID, &AWSDiskUtil{}, mount.New())
|
||||
}
|
||||
|
||||
func (plugin *awsPersistentDiskPlugin) newCleanerInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Cleaner, error) {
|
||||
return &awsPersistentDisk{
|
||||
podUID: podUID,
|
||||
volName: volName,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()},
|
||||
plugin: plugin,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Abstract interface to PD operations.
|
||||
type pdManager interface {
|
||||
// Attaches the disk to the kubelet's host machine.
|
||||
AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error
|
||||
// Detaches the disk from the kubelet's host machine.
|
||||
DetachDisk(pd *awsPersistentDisk) error
|
||||
}
|
||||
|
||||
// awsPersistentDisk volumes are disk resources provided by Google Compute Engine
|
||||
// that are attached to the kubelet's host machine and exposed to the pod.
|
||||
type awsPersistentDisk struct {
|
||||
volName string
|
||||
podUID types.UID
|
||||
// Unique name of the PD, used to find the disk resource in the provider.
|
||||
pdName string
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Specifies whether the disk will be attached as read-only.
|
||||
readOnly bool
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
manager pdManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
mounter mount.Interface
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
diskMounter mount.Interface
|
||||
plugin *awsPersistentDiskPlugin
|
||||
}
|
||||
|
||||
func detachDiskLogError(pd *awsPersistentDisk) {
|
||||
err := pd.manager.DetachDisk(pd)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to detach disk: %v (%v)", pd, err)
|
||||
}
|
||||
}
|
||||
|
||||
// getVolumeProvider returns the AWS Volumes interface
|
||||
func (pd *awsPersistentDisk) getVolumeProvider() (aws_cloud.Volumes, error) {
|
||||
name := "aws"
|
||||
cloud, err := cloudprovider.GetCloudProvider(name, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volumes, ok := cloud.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Cloud provider does not support volumes")
|
||||
}
|
||||
return volumes, nil
|
||||
}
|
||||
|
||||
// SetUp attaches the disk and bind mounts to the volume path.
|
||||
func (pd *awsPersistentDisk) SetUp() error {
|
||||
return pd.SetUpAt(pd.GetPath())
|
||||
}
|
||||
|
||||
// SetUpAt attaches the disk and bind mounts to the volume path.
|
||||
func (pd *awsPersistentDisk) SetUpAt(dir string) error {
|
||||
// TODO: handle failed mounts here.
|
||||
mountpoint, err := mount.IsMountPoint(dir)
|
||||
glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
return nil
|
||||
}
|
||||
|
||||
globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
|
||||
if err := pd.manager.AttachAndMountDisk(pd, globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
flags := uintptr(0)
|
||||
if pd.readOnly {
|
||||
flags = mount.FlagReadOnly
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(pd)
|
||||
return err
|
||||
}
|
||||
|
||||
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
|
||||
err = pd.mounter.Mount(globalPDPath, dir, "", mount.FlagBind|flags, "")
|
||||
if err != nil {
|
||||
mountpoint, mntErr := mount.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
if mntErr = pd.mounter.Unmount(dir, 0); mntErr != nil {
|
||||
glog.Errorf("Failed to unmount: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
mountpoint, mntErr := mount.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
// This is very odd, we don't expect it. We'll try again next sync loop.
|
||||
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
|
||||
return err
|
||||
}
|
||||
}
|
||||
os.Remove(dir)
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(pd)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeGlobalPDName(host volume.VolumeHost, devName string) string {
|
||||
return path.Join(host.GetPluginDir(awsPersistentDiskPluginName), "mounts", devName)
|
||||
}
|
||||
|
||||
func (pd *awsPersistentDisk) GetPath() string {
|
||||
name := awsPersistentDiskPluginName
|
||||
return pd.plugin.host.GetPodVolumeDir(pd.podUID, util.EscapeQualifiedNameForDisk(name), pd.volName)
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (pd *awsPersistentDisk) TearDown() error {
|
||||
return pd.TearDownAt(pd.GetPath())
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (pd *awsPersistentDisk) TearDownAt(dir string) error {
|
||||
mountpoint, err := mount.IsMountPoint(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !mountpoint {
|
||||
return os.Remove(dir)
|
||||
}
|
||||
|
||||
refs, err := mount.GetMountRefs(pd.mounter, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Unmount the bind-mount inside this pod
|
||||
if err := pd.mounter.Unmount(dir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
// If len(refs) is 1, then all bind mounts have been removed, and the
|
||||
// remaining reference is the global mount. It is safe to detach.
|
||||
if len(refs) == 1 {
|
||||
// pd.pdName is not initially set for volume-cleaners, so set it here.
|
||||
pd.pdName = path.Base(refs[0])
|
||||
if err := pd.manager.DetachDisk(pd); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
mountpoint, mntErr := mount.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if !mountpoint {
|
||||
if err := os.Remove(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
153
pkg/volume/aws_pd/aws_pd_test.go
Normal file
153
pkg/volume/aws_pd/aws_pd_test.go
Normal file
@ -0,0 +1,153 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_pd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
func TestCanSupport(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-pd")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
if plug.Name() != "kubernetes.io/aws-pd" {
|
||||
t.Errorf("Wrong name: %s", plug.Name())
|
||||
}
|
||||
if !plug.CanSupport(&api.Volume{VolumeSource: api.VolumeSource{AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{}}}) {
|
||||
t.Errorf("Expected true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAccessModes(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/aws-pd")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
if !contains(plug.GetAccessModes(), api.ReadWriteOnce) || !contains(plug.GetAccessModes(), api.ReadOnlyMany) {
|
||||
t.Errorf("Expected two AccessModeTypes: %s and %s", api.ReadWriteOnce, api.ReadOnlyMany)
|
||||
}
|
||||
}
|
||||
|
||||
func contains(modes []api.AccessModeType, mode api.AccessModeType) bool {
|
||||
for _, m := range modes {
|
||||
if m == mode {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type fakePDManager struct{}
|
||||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
func (fake *fakePDManager) AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error {
|
||||
globalPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
|
||||
err := os.MkdirAll(globalPath, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) DetachDisk(pd *awsPersistentDisk) error {
|
||||
globalPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
|
||||
err := os.RemoveAll(globalPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPlugin(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-pd")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
spec := &api.Volume{
|
||||
Name: "vol1",
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{
|
||||
PDName: "pd",
|
||||
FSType: "ext4",
|
||||
},
|
||||
},
|
||||
}
|
||||
builder, err := plug.(*awsPersistentDiskPlugin).newBuilderInternal(spec, types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Builder: %v", err)
|
||||
}
|
||||
if builder == nil {
|
||||
t.Errorf("Got a nil Builder: %v")
|
||||
}
|
||||
|
||||
path := builder.GetPath()
|
||||
if path != "/tmp/fake/pods/poduid/volumes/kubernetes.io~aws-pd/vol1" {
|
||||
t.Errorf("Got unexpected path: %s", path)
|
||||
}
|
||||
|
||||
if err := builder.SetUp(); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed, volume path not created: %s", path)
|
||||
} else {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed, volume path not created: %s", path)
|
||||
} else {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
cleaner, err := plug.(*awsPersistentDiskPlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Cleaner: %v", err)
|
||||
}
|
||||
if cleaner == nil {
|
||||
t.Errorf("Got a nil Cleaner: %v")
|
||||
}
|
||||
|
||||
if err := cleaner.TearDown(); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
t.Errorf("TearDown() failed, volume path still exists: %s", path)
|
||||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
140
pkg/volume/aws_pd/aws_util.go
Normal file
140
pkg/volume/aws_pd/aws_util.go
Normal file
@ -0,0 +1,140 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_pd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type AWSDiskUtil struct{}
|
||||
|
||||
// Attaches a disk specified by a volume.AWSPersistentDisk to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error {
|
||||
volumes, err := pd.getVolumeProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
flags := uintptr(0)
|
||||
if pd.readOnly {
|
||||
flags = mount.FlagReadOnly
|
||||
}
|
||||
if err := volumes.AttachDisk("", pd.pdName, pd.readOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
devicePath := path.Join("/dev/disk/by-id/", "aws-"+pd.pdName)
|
||||
if pd.partition != "" {
|
||||
devicePath = devicePath + "-part" + pd.partition
|
||||
}
|
||||
//TODO(jonesdl) There should probably be better method than busy-waiting here.
|
||||
numTries := 0
|
||||
for {
|
||||
_, err := os.Stat(devicePath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
numTries++
|
||||
if numTries == 10 {
|
||||
return errors.New("Could not attach disk: Timeout after 10s")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Only mount the PD globally once.
|
||||
mountpoint, err := mount.IsMountPoint(globalPDPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
mountpoint = false
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !mountpoint {
|
||||
err = pd.diskMounter.Mount(devicePath, globalPDPath, pd.fsType, flags, "")
|
||||
if err != nil {
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *AWSDiskUtil) DetachDisk(pd *awsPersistentDisk) error {
|
||||
// Unmount the global PD mount, which should be the only one.
|
||||
globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
|
||||
if err := pd.mounter.Unmount(globalPDPath, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
// Detach the disk
|
||||
volumes, err := pd.getVolumeProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := volumes.DetachDisk("", pd.pdName); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// safe_format_and_mount is a utility script on AWS VMs that probes a persistent disk, and if
|
||||
// necessary formats it before mounting it.
|
||||
// This eliminates the necesisty to format a PD before it is used with a Pod on AWS.
|
||||
// TODO: port this script into Go and use it for all Linux platforms
|
||||
type awsSafeFormatAndMount struct {
|
||||
mount.Interface
|
||||
runner exec.Interface
|
||||
}
|
||||
|
||||
// uses /usr/share/google/safe_format_and_mount to optionally mount, and format a disk
|
||||
func (mounter *awsSafeFormatAndMount) Mount(source string, target string, fstype string, flags uintptr, data string) error {
|
||||
// Don't attempt to format if mounting as readonly. Go straight to mounting.
|
||||
if (flags & mount.FlagReadOnly) != 0 {
|
||||
return mounter.Interface.Mount(source, target, fstype, flags, data)
|
||||
}
|
||||
args := []string{}
|
||||
// ext4 is the default for safe_format_and_mount
|
||||
if len(fstype) > 0 && fstype != "ext4" {
|
||||
args = append(args, "-m", fmt.Sprintf("mkfs.%s", fstype))
|
||||
}
|
||||
args = append(args, source, target)
|
||||
// TODO: Accept other options here?
|
||||
glog.V(5).Infof("exec-ing: /usr/share/google/safe_format_and_mount %v", args)
|
||||
cmd := mounter.runner.Command("/usr/share/google/safe_format_and_mount", args...)
|
||||
dataOut, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
glog.V(5).Infof("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut))
|
||||
}
|
||||
return err
|
||||
}
|
84
pkg/volume/aws_pd/aws_util_test.go
Normal file
84
pkg/volume/aws_pd/aws_util_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_pd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
)
|
||||
|
||||
func TestSafeFormatAndMount(t *testing.T) {
|
||||
tests := []struct {
|
||||
fstype string
|
||||
expectedArgs []string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
fstype: "ext4",
|
||||
expectedArgs: []string{"/dev/foo", "/mnt/bar"},
|
||||
},
|
||||
{
|
||||
fstype: "vfat",
|
||||
expectedArgs: []string{"-m", "mkfs.vfat", "/dev/foo", "/mnt/bar"},
|
||||
},
|
||||
{
|
||||
err: fmt.Errorf("test error"),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
|
||||
var cmdOut string
|
||||
var argsOut []string
|
||||
fake := exec.FakeExec{
|
||||
CommandScript: []exec.FakeCommandAction{
|
||||
func(cmd string, args ...string) exec.Cmd {
|
||||
cmdOut = cmd
|
||||
argsOut = args
|
||||
fake := exec.FakeCmd{
|
||||
CombinedOutputScript: []exec.FakeCombinedOutputAction{
|
||||
func() ([]byte, error) { return []byte{}, test.err },
|
||||
},
|
||||
}
|
||||
return exec.InitFakeCmd(&fake, cmd, args...)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mounter := awsSafeFormatAndMount{
|
||||
runner: &fake,
|
||||
}
|
||||
|
||||
err := mounter.Mount("/dev/foo", "/mnt/bar", test.fstype, 0, "")
|
||||
if test.err == nil && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if test.err != nil {
|
||||
if err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
return
|
||||
}
|
||||
if cmdOut != "/usr/share/google/safe_format_and_mount" {
|
||||
t.Errorf("unexpected command: %s", cmdOut)
|
||||
}
|
||||
if len(argsOut) != len(test.expectedArgs) {
|
||||
t.Errorf("unexpected args: %v, expected: %v", argsOut, test.expectedArgs)
|
||||
}
|
||||
}
|
||||
}
|
@ -25,6 +25,7 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/golang/glog"
|
||||
"github.com/onsi/ginkgo"
|
||||
@ -35,10 +36,12 @@ import (
|
||||
|
||||
type testResult bool
|
||||
|
||||
type GCEConfig struct {
|
||||
type CloudConfig struct {
|
||||
ProjectID string
|
||||
Zone string
|
||||
MasterName string
|
||||
|
||||
Provider cloudprovider.Interface
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
156
test/e2e/pd.go
156
test/e2e/pd.go
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
@ -35,7 +36,6 @@ var _ = Describe("PD", func() {
|
||||
var (
|
||||
c *client.Client
|
||||
podClient client.PodInterface
|
||||
diskName string
|
||||
host0Name string
|
||||
host1Name string
|
||||
)
|
||||
@ -51,37 +51,37 @@ var _ = Describe("PD", func() {
|
||||
expectNoError(err, "Failed to list nodes for e2e cluster.")
|
||||
Expect(len(nodes.Items) >= 2).To(BeTrue())
|
||||
|
||||
diskName = fmt.Sprintf("e2e-%s", string(util.NewUUID()))
|
||||
host0Name = nodes.Items[0].ObjectMeta.Name
|
||||
host1Name = nodes.Items[1].ObjectMeta.Name
|
||||
})
|
||||
|
||||
It("should schedule a pod w/ a RW PD, remove it, then schedule it on another host", func() {
|
||||
if testContext.Provider != "gce" {
|
||||
By(fmt.Sprintf("Skipping PD test, which is only supported for provider gce (not %s)",
|
||||
testContext.Provider))
|
||||
if testContext.provider != "gce" && testContext.provider != "aws" {
|
||||
By(fmt.Sprintf("Skipping PD test, which is only supported for providers gce & aws (not %s)",
|
||||
testContext.provider))
|
||||
return
|
||||
}
|
||||
|
||||
By("creating PD")
|
||||
diskName, err := createPD()
|
||||
expectNoError(err, "Error creating PD")
|
||||
|
||||
host0Pod := testPDPod(diskName, host0Name, false)
|
||||
host1Pod := testPDPod(diskName, host1Name, false)
|
||||
|
||||
By(fmt.Sprintf("creating PD %q", diskName))
|
||||
expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD")
|
||||
|
||||
defer func() {
|
||||
By("cleaning up PD-RW test environment")
|
||||
// Teardown pods, PD. Ignore errors.
|
||||
// Teardown should do nothing unless test failed.
|
||||
podClient.Delete(host0Pod.Name)
|
||||
podClient.Delete(host1Pod.Name)
|
||||
detachPD(host0Name, diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host1Name, diskName, testContext.GCEConfig.Zone)
|
||||
deletePD(diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host0Name, diskName)
|
||||
detachPD(host1Name, diskName)
|
||||
deletePD(diskName)
|
||||
}()
|
||||
|
||||
By("submitting host0Pod to kubernetes")
|
||||
_, err := podClient.Create(host0Pod)
|
||||
_, err = podClient.Create(host0Pod)
|
||||
expectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
|
||||
|
||||
expectNoError(waitForPodRunning(c, host0Pod.Name))
|
||||
@ -100,7 +100,7 @@ var _ = Describe("PD", func() {
|
||||
|
||||
By(fmt.Sprintf("deleting PD %q", diskName))
|
||||
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
|
||||
if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil {
|
||||
if err = deletePD(diskName); err != nil {
|
||||
Logf("Couldn't delete PD. Sleeping 5 seconds")
|
||||
continue
|
||||
}
|
||||
@ -118,6 +118,10 @@ var _ = Describe("PD", func() {
|
||||
return
|
||||
}
|
||||
|
||||
By("creating PD")
|
||||
diskName, err := createPD()
|
||||
expectNoError(err, "Error creating PD")
|
||||
|
||||
rwPod := testPDPod(diskName, host0Name, false)
|
||||
host0ROPod := testPDPod(diskName, host0Name, true)
|
||||
host1ROPod := testPDPod(diskName, host1Name, true)
|
||||
@ -129,16 +133,14 @@ var _ = Describe("PD", func() {
|
||||
podClient.Delete(rwPod.Name)
|
||||
podClient.Delete(host0ROPod.Name)
|
||||
podClient.Delete(host1ROPod.Name)
|
||||
detachPD(host0Name, diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host1Name, diskName, testContext.GCEConfig.Zone)
|
||||
deletePD(diskName, testContext.GCEConfig.Zone)
|
||||
|
||||
detachPD(host0Name, diskName)
|
||||
detachPD(host1Name, diskName)
|
||||
deletePD(diskName)
|
||||
}()
|
||||
|
||||
By(fmt.Sprintf("creating PD %q", diskName))
|
||||
expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD")
|
||||
|
||||
By("submitting rwPod to ensure PD is formatted")
|
||||
_, err := podClient.Create(rwPod)
|
||||
_, err = podClient.Create(rwPod)
|
||||
expectNoError(err, "Failed to create rwPod")
|
||||
expectNoError(waitForPodRunning(c, rwPod.Name))
|
||||
expectNoError(podClient.Delete(rwPod.Name), "Failed to delete host0Pod")
|
||||
@ -163,7 +165,7 @@ var _ = Describe("PD", func() {
|
||||
|
||||
By(fmt.Sprintf("deleting PD %q", diskName))
|
||||
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
|
||||
if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil {
|
||||
if err = deletePD(diskName); err != nil {
|
||||
Logf("Couldn't delete PD. Sleeping 5 seconds")
|
||||
continue
|
||||
}
|
||||
@ -173,24 +175,70 @@ var _ = Describe("PD", func() {
|
||||
})
|
||||
})
|
||||
|
||||
func createPD(pdName, zone string) error {
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
return exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run()
|
||||
func createPD() (string, error) {
|
||||
if testContext.provider == "gce" {
|
||||
pdName := fmt.Sprintf("e2e-%s", string(util.NewUUID()))
|
||||
|
||||
zone := testContext.cloudConfig.Zone
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
err := exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return pdName, nil
|
||||
} else if testContext.provider == "aws" {
|
||||
volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("Provider does not implement volumes interface")
|
||||
}
|
||||
volumeOptions := &aws_cloud.VolumeOptions{}
|
||||
volumeOptions.CapacityMB = 10 * 1024
|
||||
volumeOptions.Zone = testContext.cloudConfig.Zone
|
||||
return volumes.CreateVolume(volumeOptions)
|
||||
} else {
|
||||
return "", fmt.Errorf("Unsupported provider type")
|
||||
}
|
||||
}
|
||||
|
||||
func deletePD(pdName, zone string) error {
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run()
|
||||
func deletePD(pdName string) error {
|
||||
if testContext.provider == "gce" {
|
||||
zone := testContext.cloudConfig.Zone
|
||||
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run()
|
||||
} else if testContext.provider == "aws" {
|
||||
volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not implement volumes interface")
|
||||
}
|
||||
return volumes.DeleteVolume(pdName)
|
||||
} else {
|
||||
return fmt.Errorf("Unsupported provider type")
|
||||
}
|
||||
}
|
||||
|
||||
func detachPD(hostName, pdName, zone string) error {
|
||||
instanceName := strings.Split(hostName, ".")[0]
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run()
|
||||
func detachPD(hostName, pdName string) error {
|
||||
if testContext.provider == "gce" {
|
||||
instanceName := strings.Split(hostName, ".")[0]
|
||||
|
||||
zone := testContext.cloudConfig.Zone
|
||||
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run()
|
||||
} else if testContext.provider == "aws" {
|
||||
volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not implement volumes interface")
|
||||
}
|
||||
return volumes.DetachDisk(hostName, pdName)
|
||||
} else {
|
||||
return fmt.Errorf("Unsupported provider type")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
||||
return &api.Pod{
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1beta1",
|
||||
@ -199,18 +247,6 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
||||
Name: "pd-test-" + string(util.NewUUID()),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||
PDName: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "testpd",
|
||||
@ -226,4 +262,36 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
||||
Host: targetHost,
|
||||
},
|
||||
}
|
||||
|
||||
if testContext.provider == "gce" {
|
||||
pod.Spec.Volumes = []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||
PDName: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else if testContext.provider == "aws" {
|
||||
pod.Spec.Volumes = []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{
|
||||
PDName: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
panic("Unknown provider: " + testContext.provider)
|
||||
}
|
||||
|
||||
return pod
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user