From edf0292d4a27a7c8ad6cabd62589bb4e90d6b120 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 6 Mar 2015 06:26:39 -0800 Subject: [PATCH] Add initial support for Volumes to AWS --- cmd/e2e/e2e.go | 22 ++ cmd/kubelet/app/plugins.go | 2 + pkg/api/types.go | 26 ++ pkg/api/v1beta1/conversion.go | 7 + pkg/api/v1beta1/types.go | 31 +- pkg/api/v1beta2/conversion.go | 3 + pkg/api/v1beta2/types.go | 31 +- pkg/api/v1beta3/types.go | 29 ++ pkg/api/validation/validation.go | 23 ++ pkg/api/validation/validation_test.go | 1 + pkg/cloudprovider/aws/aws.go | 426 +++++++++++++++++++++++++- pkg/scheduler/predicates.go | 28 +- pkg/scheduler/predicates_test.go | 49 +++ pkg/volume/aws_pd/aws_pd.go | 287 +++++++++++++++++ pkg/volume/aws_pd/aws_pd_test.go | 153 +++++++++ pkg/volume/aws_pd/aws_util.go | 140 +++++++++ pkg/volume/aws_pd/aws_util_test.go | 84 +++++ test/e2e/driver.go | 5 +- test/e2e/pd.go | 156 +++++++--- 19 files changed, 1440 insertions(+), 63 deletions(-) create mode 100644 pkg/volume/aws_pd/aws_pd.go create mode 100644 pkg/volume/aws_pd/aws_pd_test.go create mode 100644 pkg/volume/aws_pd/aws_util.go create mode 100644 pkg/volume/aws_pd/aws_util_test.go diff --git a/cmd/e2e/e2e.go b/cmd/e2e/e2e.go index 4d1eef569bb..d06daa9165a 100644 --- a/cmd/e2e/e2e.go +++ b/cmd/e2e/e2e.go @@ -17,10 +17,13 @@ limitations under the License. package main import ( + "fmt" "os" goruntime "runtime" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/test/e2e" "github.com/golang/glog" @@ -47,6 +50,7 @@ func init() { flag.StringVar(&context.Host, "host", "", "The host, or apiserver, to connect to") flag.StringVar(&context.RepoRoot, "repo_root", "./", "Root directory of kubernetes repository, for finding test files. Default assumes working directory is repository root") flag.StringVar(&context.Provider, "provider", "", "The name of the Kubernetes provider (gce, gke, local, vagrant, etc.)") + flag.StringVar(&gceConfig.MasterName, "kube_master", "", "Name of the kubernetes master. Only required if provider is gce or gke") flag.StringVar(&gceConfig.ProjectID, "gce_project", "", "The GCE project being used, if applicable") flag.StringVar(&gceConfig.Zone, "gce_zone", "", "GCE zone being used, if applicable") @@ -63,5 +67,23 @@ func main() { glog.Error("Invalid --times (negative or no testing requested)!") os.Exit(1) } + cloudConfig := &e2e.CloudConfig{ + ProjectID: *gceProject, + Zone: *gceZone, + MasterName: *masterName, + } + + if *provider == "aws" { + awsConfig := "[Global]\n" + awsConfig += fmt.Sprintf("Region=%s\n", *gceZone) + + var err error + cloudConfig.Provider, err = cloudprovider.GetCloudProvider(*provider, strings.NewReader(awsConfig)) + if err != nil { + glog.Error("Error building AWS provider: %v", err) + os.Exit(1) + } + } + e2e.RunE2ETests(context, *orderseed, *times, *reportDir, testList) } diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index 809f6540ee7..bd522f7cf5e 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -25,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network/exec" // Volume plugins "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/aws_pd" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/empty_dir" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/gce_pd" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/git_repo" @@ -49,6 +50,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { // The list of plugins to probe is decided by the kubelet binary, not // by dynamic linking or other "magic". Plugins will be analyzed and // initialized later. + allPlugins = append(allPlugins, aws_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...) allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...) diff --git a/pkg/api/types.go b/pkg/api/types.go index cdc2ca5bed6..09354be3dcc 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -189,6 +189,9 @@ type VolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"` + // AWSPersistentDisk represents a AWS EBS disk that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk"` // GitRepo represents a git repository at a particular revision. GitRepo *GitRepoVolumeSource `json:"gitRepo"` // Secret represents a secret that should populate this volume. @@ -208,6 +211,9 @@ type PersistentVolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"` + // AWSPersistentDisk represents a AWS EBS disk that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk"` // HostPath represents a directory on the host. // This is useful for development and testing only. // on-host storage is not supported in any way @@ -384,11 +390,31 @@ type ISCSIVolumeSource struct { IQN string `json:"iqn,omitempty"` // Required: iSCSI target lun number Lun int `json:"lun,omitempty"` + // Optional: Defaults to false (read/write). ReadOnly here will force + // the ReadOnly setting in VolumeMounts. + ReadOnly bool `json:"readOnly,omitempty"` +} + +// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS. +// +// An AWS EBS disk must exist and be formatted before mounting to a container. +// The disk must also be in the same AWS zone as the kubelet. +// A AWS EBS disk can only be mounted as read/write once. +type AWSPersistentDiskVolumeSource struct { + // Unique name of the PD resource. Used to identify the disk in AWS + PDName string `json:"pdName"` + // Required: Filesystem type to mount. // Must be a filesystem type supported by the host operating system. // Ex. "ext4", "xfs", "ntfs" // TODO: how do we prevent errors in the filesystem from compromising the machine FSType string `json:"fsType,omitempty"` + + // Optional: Partition on the disk to mount. + // If omitted, kubelet will attempt to mount the device name. + // Ex. For /dev/sda1, this field is "1", for /dev/sda, this field is 0 or empty. + Partition int `json:"partition,omitempty"` + // Optional: Defaults to false (read/write). ReadOnly here will force // the ReadOnly setting in VolumeMounts. ReadOnly bool `json:"readOnly,omitempty"` diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index f3860947094..320d4634863 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1170,6 +1170,9 @@ func init() { if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil { return err } + if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil { + return err + } if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil { return err } @@ -1194,7 +1197,11 @@ func init() { if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil { return err } +<<<<<<< HEAD if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil { +======= + if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil { +>>>>>>> Add initial support for Volumes to AWS return err } if err := s.Convert(&in.HostDir, &out.HostPath, 0); err != nil { diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 23f97766b9a..e9d7b1d2d9c 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -88,7 +88,7 @@ type Volume struct { // Source represents the location and type of a volume to mount. // This is optional for now. If not specified, the Volume is implied to be an EmptyDir. // This implied behavior is deprecated and will be removed in a future version. - Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"` + Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSPersistentDisk, or GitRepo; default is EmptyDir"` } // VolumeSource represents the source location of a volume to mount. @@ -105,6 +105,9 @@ type VolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"` + // AWSPersistentDisk represents a AWS Disk resource that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"` // GitRepo represents a git repository at a particular revision. GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"` // Secret represents a secret to populate the volume with @@ -124,6 +127,9 @@ type PersistentVolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"` + // AWSPersistentDisk represents a AWS EBS volume that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"` // HostPath represents a directory on the host. // This is useful for development and testing only. // on-host storage is not supported in any way. @@ -302,6 +308,29 @@ type ISCSIVolumeSource struct { ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` } +// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS. +// +// An AWS PD must exist and be formatted before mounting to a container. +// The disk must also be in the same AWS zone as the kubelet. +// A AWS PD can only be mounted on a single machine. +type AWSPersistentDiskVolumeSource struct { + // Unique name of the PD resource. Used to identify the disk in AWS + PDName string `json:"pdName" description:"unique id of the PD resource in AWS"` + // Required: Filesystem type to mount. + // Must be a filesystem type supported by the host operating system. + // Ex. "ext4", "xfs", "ntfs" + // TODO: how do we prevent errors in the filesystem from compromising the machine + // TODO: why omitempty if required? + FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"` + // Optional: Partition on the disk to mount. + // If omitted, kubelet will attempt to mount the device name. + // Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty. + Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"` + // Optional: Defaults to false (read/write). ReadOnly here will force + // the ReadOnly setting in VolumeMounts. + ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` +} + // GitRepoVolumeSource represents a volume that is pulled from git when the pod is created. type GitRepoVolumeSource struct { // Repository URL diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 92d74ebfc20..4d75080c4d3 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1097,6 +1097,9 @@ func init() { if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil { return err } + if err := s.Convert(&in.AWSPersistentDisk, &out.AWSPersistentDisk, 0); err != nil { + return err + } if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil { return err } diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 6b537948832..6898310da53 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -55,7 +55,7 @@ type Volume struct { // Source represents the location and type of a volume to mount. // This is optional for now. If not specified, the Volume is implied to be an EmptyDir. // This implied behavior is deprecated and will be removed in a future version. - Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"` + Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSPersistentDisk, or GitRepo; default is EmptyDir"` } // VolumeSource represents the source location of a volume to mount. @@ -74,6 +74,9 @@ type VolumeSource struct { // A persistent disk that is mounted to the // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"` + // An AWS persistent disk that is mounted to the + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"` // GitRepo represents a git repository at a particular revision. GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"` // Secret is a secret to populate the volume with @@ -93,6 +96,9 @@ type PersistentVolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"` + // AWSPersistentDisk represents a AWS Disk resource that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"` // HostPath represents a directory on the host. // This is useful for development and testing only. // on-host storage is not supported in any way. @@ -284,6 +290,29 @@ type GCEPersistentDiskVolumeSource struct { ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` } +// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS. +// +// An AWS PD must exist and be formatted before mounting to a container. +// The disk must also be in the same AWS zone as the kubelet. +// A AWS PD can only be mounted on a single machine. +type AWSPersistentDiskVolumeSource struct { + // Unique name of the PD resource. Used to identify the disk in AWS + PDName string `json:"pdName" description:"unique id of the PD resource in AWS"` + // Required: Filesystem type to mount. + // Must be a filesystem type supported by the host operating system. + // Ex. "ext4", "xfs", "ntfs" + // TODO: how do we prevent errors in the filesystem from compromising the machine + // TODO: why omitempty if required? + FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"` + // Optional: Partition on the disk to mount. + // If omitted, kubelet will attempt to mount the device name. + // Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty. + Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"` + // Optional: Defaults to false (read/write). ReadOnly here will force + // the ReadOnly setting in VolumeMounts. + ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` +} + // GitRepoVolumeSource represents a volume that is pulled from git when the pod is created. type GitRepoVolumeSource struct { // Repository URL diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 3acbbc17ce9..d803eedacc2 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -206,6 +206,9 @@ type VolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource attached to the host machine on demand"` + // AWSPersistentDisk represents a AWS Disk resource that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource attached to the host machine on demand"` // GitRepo represents a git repository at a particular revision. GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"` // Secret represents a secret that should populate this volume. @@ -225,6 +228,9 @@ type PersistentVolumeSource struct { // GCEPersistentDisk represents a GCE Disk resource that is attached to a // kubelet's host machine and then exposed to the pod. GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource provisioned by an admin"` + // AWSPersistentDisk represents a AWS Disk resource that is attached to a + // kubelet's host machine and then exposed to the pod. + AWSPersistentDisk *AWSPersistentDiskVolumeSource `json:"awsPersistentDisk" description:"AWS disk resource provisioned by an admin"` // HostPath represents a directory on the host. // This is useful for development and testing only. // on-host storage is not supported in any way. @@ -400,6 +406,29 @@ type GCEPersistentDiskVolumeSource struct { ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` } +// AWSPersistentDiskVolumeSource represents a Persistent Disk resource in AWS. +// +// An AWS PD must exist and be formatted before mounting to a container. +// The disk must also be in the same AWS zone as the kubelet. +// A AWS PD can only be mounted on a single machine. +type AWSPersistentDiskVolumeSource struct { + // Unique name of the PD resource. Used to identify the disk in AWS + PDName string `json:"pdName" description:"unique id of the PD resource in AWS"` + // Required: Filesystem type to mount. + // Must be a filesystem type supported by the host operating system. + // Ex. "ext4", "xfs", "ntfs" + // TODO: how do we prevent errors in the filesystem from compromising the machine + // TODO: why omitempty if required? + FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"` + // Optional: Partition on the disk to mount. + // If omitted, kubelet will attempt to mount the device name. + // Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty. + Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"` + // Optional: Defaults to false (read/write). ReadOnly here will force + // the ReadOnly setting in VolumeMounts. + ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"` +} + // GitRepoVolumeSource represents a volume that is pulled from git when the pod is created. type GitRepoVolumeSource struct { // Repository URL diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 8c12512612b..d4f23ec8a05 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -299,6 +299,10 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList { numVolumes++ allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(source.GCEPersistentDisk).Prefix("persistentDisk")...) } + if source.AWSPersistentDisk != nil { + numVolumes++ + allErrs = append(allErrs, validateAWSPersistentDiskVolumeSource(source.AWSPersistentDisk).Prefix("awsPersistentDisk")...) + } if source.Secret != nil { numVolumes++ allErrs = append(allErrs, validateSecretVolumeSource(source.Secret).Prefix("secret")...) @@ -368,6 +372,20 @@ func validateGCEPersistentDiskVolumeSource(PD *api.GCEPersistentDiskVolumeSource return allErrs } +func validateAWSPersistentDiskVolumeSource(PD *api.AWSPersistentDiskVolumeSource) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + if PD.PDName == "" { + allErrs = append(allErrs, errs.NewFieldRequired("pdName")) + } + if PD.FSType == "" { + allErrs = append(allErrs, errs.NewFieldRequired("fsType")) + } + if PD.Partition < 0 || PD.Partition > 255 { + allErrs = append(allErrs, errs.NewFieldInvalid("partition", PD.Partition, pdPartitionErrorMsg)) + } + return allErrs +} + func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} if secretSource.SecretName == "" { @@ -426,6 +444,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList numVolumes++ allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(pv.Spec.GCEPersistentDisk).Prefix("persistentDisk")...) } + if pv.Spec.AWSPersistentDisk != nil { + numVolumes++ + allErrs = append(allErrs, validateAWSPersistentDiskVolumeSource(pv.Spec.AWSPersistentDisk).Prefix("awsPersistentDisk")...) + } if numVolumes != 1 { allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required")) } @@ -1021,6 +1043,7 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ValidationErrorL allErrs = append(allErrs, errs.NewFieldInvalid("GCEPersistentDisk.ReadOnly", false, "ReadOnly must be true for replicated pods > 1, as GCE PD can only be mounted on multiple machines if it is read-only.")) } } + // TODO: What to do for AWS? It doesn't support replicas } return allErrs } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 1076f97087f..a922befb34f 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -516,6 +516,7 @@ func TestValidateVolumes(t *testing.T) { {Name: "abc-123", VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{"/mnt/path3"}}}, {Name: "empty", VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}}, {Name: "gcepd", VolumeSource: api.VolumeSource{GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{"my-PD", "ext4", 1, false}}}, + {Name: "awspd", VolumeSource: api.VolumeSource{AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{"my-PD", "ext4", 1, false}}}, {Name: "gitrepo", VolumeSource: api.VolumeSource{GitRepo: &api.GitRepoVolumeSource{"my-repo", "hashstring"}}}, {Name: "iscsidisk", VolumeSource: api.VolumeSource{ISCSI: &api.ISCSIVolumeSource{"127.0.0.1", "iqn.2015-02.example.com:test", 1, "ext4", false}}}, {Name: "secret", VolumeSource: api.VolumeSource{Secret: &api.SecretVolumeSource{"my-secret"}}}, diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 9eb7fe7929e..c804200d2f1 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -17,11 +17,15 @@ limitations under the License. package aws_cloud import ( + "errors" "fmt" "io" "net" + "net/url" "regexp" "strings" + "sync" + "time" "code.google.com/p/gcfg" "github.com/mitchellh/goamz/aws" @@ -38,6 +42,19 @@ import ( type EC2 interface { // Query EC2 for instances matching the filter Instances(instIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) + + + // Attach a volume to an instance + AttachVolume(volumeId string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error) + // Detach a volume from whatever instance it is attached to + // TODO: We should specify the InstanceID and the Device, for safety + DetachVolume(volumeId string) (resp *ec2.SimpleResp, err error) + // Lists volumes + Volumes(volumeIds []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) + // Create an EBS volume + CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) + // Delete an EBS volume + DeleteVolume(volumeId string) (resp *ec2.SimpleResp, err error) } // Abstraction over the AWS metadata service @@ -46,12 +63,34 @@ type AWSMetadata interface { GetMetaData(key string) ([]byte, error) } +type VolumeOptions struct { + CapacityMB int + Zone string +} + +// Volumes is an interface for managing cloud-provisioned volumes +type Volumes interface { + // Attach the disk to the specified instance + // instanceName can be empty to mean "the instance on which we are running" + AttachDisk(instanceName string, volumeName string, readOnly bool) error + // Detach the disk from the specified instance + // instanceName can be empty to mean "the instance on which we are running" + DetachDisk(instanceName string, volumeName string) error + + // Create a volume with the specified options + CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error) + DeleteVolume(volumeName string) error +} + // AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services. type AWSCloud struct { ec2 EC2 cfg *AWSCloudConfig availabilityZone string region aws.Region + + // The AWS instance that we are running on + selfAwsInstance *awsInstance } type AWSCloudConfig struct { @@ -76,12 +115,12 @@ func (f *ec2InstanceFilter) Matches(instance ec2.Instance) bool { } // goamzEC2 is an implementation of the EC2 interface, backed by goamz -type GoamzEC2 struct { +type goamzEC2 struct { ec2 *ec2.EC2 } // Implementation of EC2.Instances -func (self *GoamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) { +func (self *goamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) { var goamzFilter *ec2.Filter if filter != nil { goamzFilter = ec2.NewFilter() @@ -106,6 +145,26 @@ func (self *goamzMetadata) GetMetaData(key string) ([]byte, error) { type AuthFunc func() (auth aws.Auth, err error) +func (s *goamzEC2) AttachVolume(volumeId string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) { + return s.ec2.AttachVolume(volumeId, instanceId, device) +} + +func (s *goamzEC2) DetachVolume(volumeId string) (resp *ec2.SimpleResp, err error) { + return s.ec2.DetachVolume(volumeId) +} + +func (s *goamzEC2) Volumes(volumeIds []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) { + return s.ec2.Volumes(volumeIds, filter) +} + +func (s *goamzEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) { + return s.ec2.CreateVolume(request) +} + +func (s *goamzEC2) DeleteVolume(volumeId string) (resp *ec2.SimpleResp, err error) { + return s.ec2.DeleteVolume(volumeId) +} + func init() { cloudprovider.RegisterCloudProvider("aws", func(config io.Reader) (cloudprovider.Interface, error) { metadata := &goamzMetadata{} @@ -179,12 +238,23 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone) } - return &AWSCloud{ - ec2: &GoamzEC2{ec2: ec2.New(auth, region)}, - cfg: cfg, + ec2 := &goamzEC2{ec2: ec2.New(auth, region)} + + instanceId, err := ec2.GetMetaData("instance-id") + if err != nil { + return nil, fmt.Errorf("error fetching instance-id from ec2 metadata service: %v", err) + } + + awsCloud := &AWSCloud{ + ec2: ec2, + cfg: cfg, region: region, availabilityZone: zone, - }, nil + } + + awsCloud.selfAwsInstance = newAwsInstance(ec2, string(instanceId)) + + return awsCloud, nil } func (aws *AWSCloud) Clusters() (cloudprovider.Clusters, bool) { @@ -377,7 +447,7 @@ func getResourcesByInstanceType(instanceType string) (*api.NodeResources, error) return makeNodeResources("t1", 0.125, 0.615) // t2: Burstable - // TODO: The ECUs are fake values (because they are burstable), so this is just a guess... + // TODO: The ECUs are fake values (because they are burstable), so this is just a guess... case "t2.micro": return makeNodeResources("t2", 0.25, 1) case "t2.small": @@ -506,3 +576,345 @@ func (self *AWSCloud) GetZone() (cloudprovider.Zone, error) { Region: self.region.Name, }, nil } + +// Abstraction around AWS Instance Types +// There isn't an API to get information for a particular instance type (that I know of) +type awsInstanceType struct { +} + +// TODO: Also return number of mounts allowed? +func (self *awsInstanceType) getEbsMountDevices() []string { + // See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html + devices := []string{} + for c := 'f'; c <= 'p'; c++ { + devices = append(devices, fmt.Sprintf("/dev/sd%s", c)) + } + return devices +} + +type awsInstance struct { + ec2 EC2 + + // id in AWS + awsId string + + mutex sync.Mutex + + // We must cache because otherwise there is a race condition, + // where we assign a device mapping and then get a second request before we attach the volume + deviceMappings map[string]string +} + +func newAwsInstance(ec2 EC2, awsId string) *awsInstance { + self := &awsInstance{ec2: ec2, awsId: awsId} + + // We lazy-init deviceMappings + self.deviceMappings = nil + + return self +} + +// Gets the awsInstanceType that models the instance type of this instance +func (self *awsInstance) getInstanceType() *awsInstanceType { + // TODO: Make this real + awsInstanceType := &awsInstanceType{} + return awsInstanceType +} + +// Gets the full information about this instance from the EC2 API +func (self *awsInstance) getInfo() (*ec2.Instance, error) { + resp, err := self.ec2.Instances([]string{self.awsId}, nil) + if err != nil { + return nil, fmt.Errorf("error querying ec2 for instance info: %v", err) + } + if len(resp.Reservations) == 0 { + return nil, fmt.Errorf("no reservations found for instance: %s", self.awsId) + } + if len(resp.Reservations) > 1 { + return nil, fmt.Errorf("multiple reservations found for instance: %s", self.awsId) + } + if len(resp.Reservations[0].Instances) == 0 { + return nil, fmt.Errorf("no instances found for instance: %s", self.awsId) + } + if len(resp.Reservations[0].Instances) > 1 { + return nil, fmt.Errorf("multiple instances found for instance: %s", self.awsId) + } + return &resp.Reservations[0].Instances[0], nil +} + +func (self *awsInstance) assignMountDevice(volumeId string) (string, error) { + instanceType := self.getInstanceType() + if instanceType == nil { + return "", fmt.Errorf("could not get instance type for instance: %s", self.awsId) + } + + // We lock to prevent concurrent mounts from conflicting + // We may still conflict if someone calls the API concurrently, + // but the AWS API will then fail one of the two attach operations + self.mutex.Lock() + defer self.mutex.Unlock() + + // We cache both for efficiency and correctness + if self.deviceMappings == nil { + info, err := self.getInfo() + if err != nil { + return "", err + } + deviceMappings := map[string]string{} + for _, blockDevice := range info.BlockDevices { + deviceMappings[blockDevice.DeviceName] = blockDevice.VolumeId + } + self.deviceMappings = deviceMappings + } + + // Check to see if this volume is already assigned a device on this machine + for deviceName, mappingVolumeId := range self.deviceMappings { + if volumeId == mappingVolumeId { + glog.Warningf("Got assignment call for already-assigned volume: %s@%s", deviceName, mappingVolumeId) + return deviceName, nil + } + } + + // Check all the valid mountpoints to see if any of them are free + valid := instanceType.getEbsMountDevices() + chosen := "" + for _, device := range valid { + _, found := self.deviceMappings[device] + if !found { + chosen = device + break + } + } + + if chosen == "" { + glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", self.deviceMappings, valid) + return "", nil + } + + self.deviceMappings[chosen] = volumeId + glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeId) + + return chosen, nil +} + +func (self *awsInstance) releaseMountDevice(volumeId string, mountDevice string) { + self.mutex.Lock() + defer self.mutex.Unlock() + + existingVolumeId, found := self.deviceMappings[mountDevice] + if !found { + glog.Errorf("releaseMountDevice on non-allocated device") + return + } + if volumeId != existingVolumeId { + glog.Errorf("releaseMountDevice on device assigned to different volume") + return + } + glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeId) + delete(self.deviceMappings, mountDevice) +} + +type awsDisk struct { + ec2 EC2 + + // Name in k8s + name string + // id in AWS + awsId string + // az which holds the volume + az string +} + +func newAwsDisk(ec2 EC2, name string) (*awsDisk, error) { + // name looks like aws://availability-zone/id + url, err := url.Parse(name) + if err != nil { + // TODO: Maybe we should pass a URL into the Volume functions + return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err) + } + if url.Scheme != "aws" { + return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name) + } + awsId := url.Path + // TODO: Regex match? + if strings.Contains(awsId, "/") || !strings.HasPrefix(awsId, "vol-") { + return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name) + } + az := url.Host + // TODO: Better validation? + // TODO: Default to our AZ? Look it up? + // TODO: Should this be a region or an AZ? + if az == "" { + return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name) + } + disk := &awsDisk{ec2: ec2, name: name, awsId: awsId, az: az} + return disk, nil +} + +// Gets the full information about this volume from the EC2 API +func (self *awsDisk) getInfo() (*ec2.Volume, error) { + resp, err := self.ec2.Volumes([]string{self.awsId}, nil) + if err != nil { + return nil, fmt.Errorf("error querying ec2 for volume info: %v", err) + } + if len(resp.Volumes) == 0 { + return nil, fmt.Errorf("no volumes found for volume: %s", self.awsId) + } + if len(resp.Volumes) > 1 { + return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsId) + } + return &resp.Volumes[0], nil +} + +func (self *awsDisk) waitForAttachmentStatus(status string) error { + attempt := 0 + maxAttempts := 60 + + for { + info, err := self.getInfo() + if err != nil { + return err + } + if len(info.Attachments) > 1 { + glog.Warningf("Found multiple attachments for volume: %v", info) + } + attachmentStatus := "" + for _, attachment := range info.Attachments { + if attachment.Status == status { + return nil + } + attachmentStatus = attachment.Status + } + glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) + + attempt++ + if attempt > maxAttempts { + glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) + return errors.New("Timeout waiting for volume state") + } + + time.Sleep(1 * time.Second) + } +} + +// Deletes the EBS disk +func (self *awsDisk) delete() error { + _, err := self.ec2.DeleteVolume(self.awsId) + if err != nil { + return fmt.Errorf("error delete EBS volumes: %v", err) + } + return nil +} + +// Gets the awsInstance for the EC2 instance on which we are running +// may return nil in case of error +func (aws *AWSCloud) getSelfAwsInstance() *awsInstance { + // Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance + return aws.selfAwsInstance +} + +// Implements Volumes.AttachDisk +func (aws *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly bool) error { + disk, err := newAwsDisk(aws.ec2, diskName) + if err != nil { + return err + } + + var awsInstance *awsInstance + if instanceName == "" { + awsInstance = aws.selfAwsInstance + } else { + instance, err := aws.getInstancesByDnsName(instanceName) + if err != nil { + return fmt.Errorf("Error finding instance: %v", err) + } + + awsInstance = newAwsInstance(aws.ec2, instance.InstanceId) + } + + if readOnly { + // TODO: We could enforce this when we mount the volume (?) + // TODO: We could also snapshot the volume and attach copies of it + return errors.New("AWS volumes cannot be mounted read-only") + } + + mountDevice, err := awsInstance.assignMountDevice(disk.awsId) + if err != nil { + return err + } + + attached := false + defer func() { + if !attached { + awsInstance.releaseMountDevice(disk.awsId, mountDevice) + } + }() + + attachResponse, err := aws.ec2.AttachVolume(disk.awsId, awsInstance.awsId, mountDevice) + if err != nil { + // TODO: Check if already attached? + return fmt.Errorf("Error attaching EBS volume: %v", err) + } + + glog.V(2).Info("AttachVolume request returned %v", attachResponse) + + err = disk.waitForAttachmentStatus("attached") + if err != nil { + return err + } + + attached = true + + // TODO: Return device name (and note that it might look like /dev/xvdf, not /dev/sdf) + return nil +} + +// Implements Volumes.DetachDisk +func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { + disk, err := newAwsDisk(aws.ec2, diskName) + if err != nil { + return err + } + + // TODO: We should specify the InstanceID and the Device, for safety + response, err := aws.ec2.DetachVolume(disk.awsId) + if err != nil { + return fmt.Errorf("error detaching EBS volume: %v", err) + } + if response == nil { + return errors.New("no response from DetachVolume") + } + err = disk.waitForAttachmentStatus("detached") + if err != nil { + return err + } + + return err +} + +// Implements Volumes.CreateVolume +func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { + request := &ec2.CreateVolume{} + request.AvailZone = volumeOptions.Zone + request.Size = (int64(volumeOptions.CapacityMB) + 1023) / 1024 + response, err := aws.ec2.CreateVolume(request) + if err != nil { + return "", err + } + + az := response.AvailZone + awsId := response.VolumeId + + volumeName := "aws://" + az + "/" + awsId + + return volumeName, nil +} + +// Implements Volumes.DeleteVolume +func (aws *AWSCloud) DeleteVolume(volumeName string) error { + awsDisk, err := newAwsDisk(aws.ec2, volumeName) + if err != nil { + return err + } + return awsDisk.delete() +} diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index ce8c94cadc6..0eccffe88a1 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -50,16 +50,26 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) { } func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { - if volume.GCEPersistentDisk == nil { - return false - } - pdName := volume.GCEPersistentDisk.PDName + if volume.GCEPersistentDisk != nil { + pdName := volume.GCEPersistentDisk.PDName - manifest := &(pod.Spec) - for ix := range manifest.Volumes { - if manifest.Volumes[ix].GCEPersistentDisk != nil && - manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName { - return true + manifest := &(pod.Spec) + for ix := range manifest.Volumes { + if manifest.Volumes[ix].GCEPersistentDisk != nil && + manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName { + return true + } + } + } + if volume.AWSPersistentDisk != nil { + pdName := volume.AWSPersistentDisk.PDName + + manifest := &(pod.Spec) + for ix := range manifest.Volumes { + if manifest.Volumes[ix].AWSPersistentDisk != nil && + manifest.Volumes[ix].AWSPersistentDisk.PDName == pdName { + return true + } } } return false diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 598cc527760..b2c8818d2a3 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -321,6 +321,55 @@ func TestDiskConflicts(t *testing.T) { } } +func TestAWSDiskConflicts(t *testing.T) { + volState := api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{ + PDName: "foo", + }, + }, + }, + }, + } + volState2 := api.PodSpec{ + Volumes: []api.Volume{ + { + VolumeSource: api.VolumeSource{ + AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{ + PDName: "bar", + }, + }, + }, + }, + } + tests := []struct { + pod api.Pod + existingPods []api.Pod + isOk bool + test string + }{ + {api.Pod{}, []api.Pod{}, true, "nothing"}, + {api.Pod{}, []api.Pod{{Spec: volState}}, true, "one state"}, + {api.Pod{Spec: volState}, []api.Pod{{Spec: volState}}, false, "same state"}, + {api.Pod{Spec: volState2}, []api.Pod{{Spec: volState}}, true, "different state"}, + } + + for _, test := range tests { + ok, err := NoDiskConflict(test.pod, test.existingPods, "machine") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if test.isOk && !ok { + t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test) + } + if !test.isOk && ok { + t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test) + } + } +} + func TestPodFitsSelector(t *testing.T) { tests := []struct { pod api.Pod diff --git a/pkg/volume/aws_pd/aws_pd.go b/pkg/volume/aws_pd/aws_pd.go new file mode 100644 index 00000000000..cb48b07cd87 --- /dev/null +++ b/pkg/volume/aws_pd/aws_pd.go @@ -0,0 +1,287 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_pd + +import ( + "fmt" + "os" + "path" + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/golang/glog" +) + +// This is the primary entrypoint for volume plugins. +func ProbeVolumePlugins() []volume.VolumePlugin { + return []volume.VolumePlugin{&awsPersistentDiskPlugin{nil}} +} + +type awsPersistentDiskPlugin struct { + host volume.VolumeHost +} + +var _ volume.VolumePlugin = &awsPersistentDiskPlugin{} + +const ( + awsPersistentDiskPluginName = "kubernetes.io/aws-pd" +) + +func (plugin *awsPersistentDiskPlugin) Init(host volume.VolumeHost) { + plugin.host = host +} + +func (plugin *awsPersistentDiskPlugin) Name() string { + return awsPersistentDiskPluginName +} + +func (plugin *awsPersistentDiskPlugin) CanSupport(spec *api.Volume) bool { + if spec.AWSPersistentDisk != nil { + return true + } + return false +} + +func (plugin *awsPersistentDiskPlugin) GetAccessModes() []api.AccessModeType { + return []api.AccessModeType{ + api.ReadWriteOnce, + } +} + +func (plugin *awsPersistentDiskPlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) { + // Inject real implementations here, test through the internal function. + return plugin.newBuilderInternal(spec, podRef.UID, &AWSDiskUtil{}, mount.New()) +} + +func (plugin *awsPersistentDiskPlugin) newBuilderInternal(spec *api.Volume, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Builder, error) { + pdName := spec.AWSPersistentDisk.PDName + fsType := spec.AWSPersistentDisk.FSType + partition := "" + if spec.AWSPersistentDisk.Partition != 0 { + partition = strconv.Itoa(spec.AWSPersistentDisk.Partition) + } + readOnly := spec.AWSPersistentDisk.ReadOnly + + return &awsPersistentDisk{ + podUID: podUID, + volName: spec.Name, + pdName: pdName, + fsType: fsType, + partition: partition, + readOnly: readOnly, + manager: manager, + mounter: mounter, + diskMounter: &awsSafeFormatAndMount{mounter, exec.New()}, + plugin: plugin, + }, nil +} + +func (plugin *awsPersistentDiskPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) { + // Inject real implementations here, test through the internal function. + return plugin.newCleanerInternal(volName, podUID, &AWSDiskUtil{}, mount.New()) +} + +func (plugin *awsPersistentDiskPlugin) newCleanerInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Cleaner, error) { + return &awsPersistentDisk{ + podUID: podUID, + volName: volName, + manager: manager, + mounter: mounter, + diskMounter: &awsSafeFormatAndMount{mounter, exec.New()}, + plugin: plugin, + }, nil +} + +// Abstract interface to PD operations. +type pdManager interface { + // Attaches the disk to the kubelet's host machine. + AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error + // Detaches the disk from the kubelet's host machine. + DetachDisk(pd *awsPersistentDisk) error +} + +// awsPersistentDisk volumes are disk resources provided by Google Compute Engine +// that are attached to the kubelet's host machine and exposed to the pod. +type awsPersistentDisk struct { + volName string + podUID types.UID + // Unique name of the PD, used to find the disk resource in the provider. + pdName string + // Filesystem type, optional. + fsType string + // Specifies the partition to mount + partition string + // Specifies whether the disk will be attached as read-only. + readOnly bool + // Utility interface that provides API calls to the provider to attach/detach disks. + manager pdManager + // Mounter interface that provides system calls to mount the global path to the pod local path. + mounter mount.Interface + // diskMounter provides the interface that is used to mount the actual block device. + diskMounter mount.Interface + plugin *awsPersistentDiskPlugin +} + +func detachDiskLogError(pd *awsPersistentDisk) { + err := pd.manager.DetachDisk(pd) + if err != nil { + glog.Warningf("Failed to detach disk: %v (%v)", pd, err) + } +} + +// getVolumeProvider returns the AWS Volumes interface +func (pd *awsPersistentDisk) getVolumeProvider() (aws_cloud.Volumes, error) { + name := "aws" + cloud, err := cloudprovider.GetCloudProvider(name, nil) + if err != nil { + return nil, err + } + volumes, ok := cloud.(aws_cloud.Volumes) + if !ok { + return nil, fmt.Errorf("Cloud provider does not support volumes") + } + return volumes, nil +} + +// SetUp attaches the disk and bind mounts to the volume path. +func (pd *awsPersistentDisk) SetUp() error { + return pd.SetUpAt(pd.GetPath()) +} + +// SetUpAt attaches the disk and bind mounts to the volume path. +func (pd *awsPersistentDisk) SetUpAt(dir string) error { + // TODO: handle failed mounts here. + mountpoint, err := mount.IsMountPoint(dir) + glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err) + if err != nil && !os.IsNotExist(err) { + return err + } + if mountpoint { + return nil + } + + globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName) + if err := pd.manager.AttachAndMountDisk(pd, globalPDPath); err != nil { + return err + } + + flags := uintptr(0) + if pd.readOnly { + flags = mount.FlagReadOnly + } + + if err := os.MkdirAll(dir, 0750); err != nil { + // TODO: we should really eject the attach/detach out into its own control loop. + detachDiskLogError(pd) + return err + } + + // Perform a bind mount to the full path to allow duplicate mounts of the same PD. + err = pd.mounter.Mount(globalPDPath, dir, "", mount.FlagBind|flags, "") + if err != nil { + mountpoint, mntErr := mount.IsMountPoint(dir) + if mntErr != nil { + glog.Errorf("isMountpoint check failed: %v", mntErr) + return err + } + if mountpoint { + if mntErr = pd.mounter.Unmount(dir, 0); mntErr != nil { + glog.Errorf("Failed to unmount: %v", mntErr) + return err + } + mountpoint, mntErr := mount.IsMountPoint(dir) + if mntErr != nil { + glog.Errorf("isMountpoint check failed: %v", mntErr) + return err + } + if mountpoint { + // This is very odd, we don't expect it. We'll try again next sync loop. + glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir) + return err + } + } + os.Remove(dir) + // TODO: we should really eject the attach/detach out into its own control loop. + detachDiskLogError(pd) + return err + } + + return nil +} + +func makeGlobalPDName(host volume.VolumeHost, devName string) string { + return path.Join(host.GetPluginDir(awsPersistentDiskPluginName), "mounts", devName) +} + +func (pd *awsPersistentDisk) GetPath() string { + name := awsPersistentDiskPluginName + return pd.plugin.host.GetPodVolumeDir(pd.podUID, util.EscapeQualifiedNameForDisk(name), pd.volName) +} + +// Unmounts the bind mount, and detaches the disk only if the PD +// resource was the last reference to that disk on the kubelet. +func (pd *awsPersistentDisk) TearDown() error { + return pd.TearDownAt(pd.GetPath()) +} + +// Unmounts the bind mount, and detaches the disk only if the PD +// resource was the last reference to that disk on the kubelet. +func (pd *awsPersistentDisk) TearDownAt(dir string) error { + mountpoint, err := mount.IsMountPoint(dir) + if err != nil { + return err + } + if !mountpoint { + return os.Remove(dir) + } + + refs, err := mount.GetMountRefs(pd.mounter, dir) + if err != nil { + return err + } + // Unmount the bind-mount inside this pod + if err := pd.mounter.Unmount(dir, 0); err != nil { + return err + } + // If len(refs) is 1, then all bind mounts have been removed, and the + // remaining reference is the global mount. It is safe to detach. + if len(refs) == 1 { + // pd.pdName is not initially set for volume-cleaners, so set it here. + pd.pdName = path.Base(refs[0]) + if err := pd.manager.DetachDisk(pd); err != nil { + return err + } + } + mountpoint, mntErr := mount.IsMountPoint(dir) + if mntErr != nil { + glog.Errorf("isMountpoint check failed: %v", mntErr) + return err + } + if !mountpoint { + if err := os.Remove(dir); err != nil { + return err + } + } + return nil +} diff --git a/pkg/volume/aws_pd/aws_pd_test.go b/pkg/volume/aws_pd/aws_pd_test.go new file mode 100644 index 00000000000..d3cff633ca1 --- /dev/null +++ b/pkg/volume/aws_pd/aws_pd_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_pd + +import ( + "os" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" +) + +func TestCanSupport(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-pd") + if err != nil { + t.Errorf("Can't find the plugin by name") + } + if plug.Name() != "kubernetes.io/aws-pd" { + t.Errorf("Wrong name: %s", plug.Name()) + } + if !plug.CanSupport(&api.Volume{VolumeSource: api.VolumeSource{AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{}}}) { + t.Errorf("Expected true") + } +} + +func TestGetAccessModes(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/aws-pd") + if err != nil { + t.Errorf("Can't find the plugin by name") + } + if !contains(plug.GetAccessModes(), api.ReadWriteOnce) || !contains(plug.GetAccessModes(), api.ReadOnlyMany) { + t.Errorf("Expected two AccessModeTypes: %s and %s", api.ReadWriteOnce, api.ReadOnlyMany) + } +} + +func contains(modes []api.AccessModeType, mode api.AccessModeType) bool { + for _, m := range modes { + if m == mode { + return true + } + } + return false +} + +type fakePDManager struct{} + +// TODO(jonesdl) To fully test this, we could create a loopback device +// and mount that instead. +func (fake *fakePDManager) AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error { + globalPath := makeGlobalPDName(pd.plugin.host, pd.pdName) + err := os.MkdirAll(globalPath, 0750) + if err != nil { + return err + } + return nil +} + +func (fake *fakePDManager) DetachDisk(pd *awsPersistentDisk) error { + globalPath := makeGlobalPDName(pd.plugin.host, pd.pdName) + err := os.RemoveAll(globalPath) + if err != nil { + return err + } + return nil +} + +func TestPlugin(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-pd") + if err != nil { + t.Errorf("Can't find the plugin by name") + } + spec := &api.Volume{ + Name: "vol1", + VolumeSource: api.VolumeSource{ + AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{ + PDName: "pd", + FSType: "ext4", + }, + }, + } + builder, err := plug.(*awsPersistentDiskPlugin).newBuilderInternal(spec, types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{}) + if err != nil { + t.Errorf("Failed to make a new Builder: %v", err) + } + if builder == nil { + t.Errorf("Got a nil Builder: %v") + } + + path := builder.GetPath() + if path != "/tmp/fake/pods/poduid/volumes/kubernetes.io~aws-pd/vol1" { + t.Errorf("Got unexpected path: %s", path) + } + + if err := builder.SetUp(); err != nil { + t.Errorf("Expected success, got: %v", err) + } + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + t.Errorf("SetUp() failed, volume path not created: %s", path) + } else { + t.Errorf("SetUp() failed: %v", err) + } + } + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + t.Errorf("SetUp() failed, volume path not created: %s", path) + } else { + t.Errorf("SetUp() failed: %v", err) + } + } + + cleaner, err := plug.(*awsPersistentDiskPlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{}) + if err != nil { + t.Errorf("Failed to make a new Cleaner: %v", err) + } + if cleaner == nil { + t.Errorf("Got a nil Cleaner: %v") + } + + if err := cleaner.TearDown(); err != nil { + t.Errorf("Expected success, got: %v", err) + } + if _, err := os.Stat(path); err == nil { + t.Errorf("TearDown() failed, volume path still exists: %s", path) + } else if !os.IsNotExist(err) { + t.Errorf("SetUp() failed: %v", err) + } +} diff --git a/pkg/volume/aws_pd/aws_util.go b/pkg/volume/aws_pd/aws_util.go new file mode 100644 index 00000000000..f265aebdcb6 --- /dev/null +++ b/pkg/volume/aws_pd/aws_util.go @@ -0,0 +1,140 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_pd + +import ( + "errors" + "fmt" + "os" + "path" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/golang/glog" +) + +type AWSDiskUtil struct{} + +// Attaches a disk specified by a volume.AWSPersistentDisk to the current kubelet. +// Mounts the disk to it's global path. +func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsPersistentDisk, globalPDPath string) error { + volumes, err := pd.getVolumeProvider() + if err != nil { + return err + } + flags := uintptr(0) + if pd.readOnly { + flags = mount.FlagReadOnly + } + if err := volumes.AttachDisk("", pd.pdName, pd.readOnly); err != nil { + return err + } + devicePath := path.Join("/dev/disk/by-id/", "aws-"+pd.pdName) + if pd.partition != "" { + devicePath = devicePath + "-part" + pd.partition + } + //TODO(jonesdl) There should probably be better method than busy-waiting here. + numTries := 0 + for { + _, err := os.Stat(devicePath) + if err == nil { + break + } + if err != nil && !os.IsNotExist(err) { + return err + } + numTries++ + if numTries == 10 { + return errors.New("Could not attach disk: Timeout after 10s") + } + time.Sleep(time.Second) + } + + // Only mount the PD globally once. + mountpoint, err := mount.IsMountPoint(globalPDPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(globalPDPath, 0750); err != nil { + return err + } + mountpoint = false + } else { + return err + } + } + if !mountpoint { + err = pd.diskMounter.Mount(devicePath, globalPDPath, pd.fsType, flags, "") + if err != nil { + os.Remove(globalPDPath) + return err + } + } + return nil +} + +// Unmounts the device and detaches the disk from the kubelet's host machine. +func (util *AWSDiskUtil) DetachDisk(pd *awsPersistentDisk) error { + // Unmount the global PD mount, which should be the only one. + globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName) + if err := pd.mounter.Unmount(globalPDPath, 0); err != nil { + return err + } + if err := os.Remove(globalPDPath); err != nil { + return err + } + // Detach the disk + volumes, err := pd.getVolumeProvider() + if err != nil { + return err + } + if err := volumes.DetachDisk("", pd.pdName); err != nil { + return err + } + return nil +} + +// safe_format_and_mount is a utility script on AWS VMs that probes a persistent disk, and if +// necessary formats it before mounting it. +// This eliminates the necesisty to format a PD before it is used with a Pod on AWS. +// TODO: port this script into Go and use it for all Linux platforms +type awsSafeFormatAndMount struct { + mount.Interface + runner exec.Interface +} + +// uses /usr/share/google/safe_format_and_mount to optionally mount, and format a disk +func (mounter *awsSafeFormatAndMount) Mount(source string, target string, fstype string, flags uintptr, data string) error { + // Don't attempt to format if mounting as readonly. Go straight to mounting. + if (flags & mount.FlagReadOnly) != 0 { + return mounter.Interface.Mount(source, target, fstype, flags, data) + } + args := []string{} + // ext4 is the default for safe_format_and_mount + if len(fstype) > 0 && fstype != "ext4" { + args = append(args, "-m", fmt.Sprintf("mkfs.%s", fstype)) + } + args = append(args, source, target) + // TODO: Accept other options here? + glog.V(5).Infof("exec-ing: /usr/share/google/safe_format_and_mount %v", args) + cmd := mounter.runner.Command("/usr/share/google/safe_format_and_mount", args...) + dataOut, err := cmd.CombinedOutput() + if err != nil { + glog.V(5).Infof("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut)) + } + return err +} diff --git a/pkg/volume/aws_pd/aws_util_test.go b/pkg/volume/aws_pd/aws_util_test.go new file mode 100644 index 00000000000..fc8970426be --- /dev/null +++ b/pkg/volume/aws_pd/aws_util_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_pd + +import ( + "fmt" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" +) + +func TestSafeFormatAndMount(t *testing.T) { + tests := []struct { + fstype string + expectedArgs []string + err error + }{ + { + fstype: "ext4", + expectedArgs: []string{"/dev/foo", "/mnt/bar"}, + }, + { + fstype: "vfat", + expectedArgs: []string{"-m", "mkfs.vfat", "/dev/foo", "/mnt/bar"}, + }, + { + err: fmt.Errorf("test error"), + }, + } + for _, test := range tests { + + var cmdOut string + var argsOut []string + fake := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + cmdOut = cmd + argsOut = args + fake := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte{}, test.err }, + }, + } + return exec.InitFakeCmd(&fake, cmd, args...) + }, + }, + } + + mounter := awsSafeFormatAndMount{ + runner: &fake, + } + + err := mounter.Mount("/dev/foo", "/mnt/bar", test.fstype, 0, "") + if test.err == nil && err != nil { + t.Errorf("unexpected error: %v", err) + } + if test.err != nil { + if err == nil { + t.Errorf("unexpected non-error") + } + return + } + if cmdOut != "/usr/share/google/safe_format_and_mount" { + t.Errorf("unexpected command: %s", cmdOut) + } + if len(argsOut) != len(test.expectedArgs) { + t.Errorf("unexpected args: %v, expected: %v", argsOut, test.expectedArgs) + } + } +} diff --git a/test/e2e/driver.go b/test/e2e/driver.go index 29db19667de..813ee48ce1b 100644 --- a/test/e2e/driver.go +++ b/test/e2e/driver.go @@ -25,6 +25,7 @@ import ( "strings" "syscall" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" "github.com/onsi/ginkgo" @@ -35,10 +36,12 @@ import ( type testResult bool -type GCEConfig struct { +type CloudConfig struct { ProjectID string Zone string MasterName string + + Provider cloudprovider.Interface } func init() { diff --git a/test/e2e/pd.go b/test/e2e/pd.go index eedd6ff9104..e2f7def4a58 100644 --- a/test/e2e/pd.go +++ b/test/e2e/pd.go @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -35,7 +36,6 @@ var _ = Describe("PD", func() { var ( c *client.Client podClient client.PodInterface - diskName string host0Name string host1Name string ) @@ -51,37 +51,37 @@ var _ = Describe("PD", func() { expectNoError(err, "Failed to list nodes for e2e cluster.") Expect(len(nodes.Items) >= 2).To(BeTrue()) - diskName = fmt.Sprintf("e2e-%s", string(util.NewUUID())) host0Name = nodes.Items[0].ObjectMeta.Name host1Name = nodes.Items[1].ObjectMeta.Name }) It("should schedule a pod w/ a RW PD, remove it, then schedule it on another host", func() { - if testContext.Provider != "gce" { - By(fmt.Sprintf("Skipping PD test, which is only supported for provider gce (not %s)", - testContext.Provider)) + if testContext.provider != "gce" && testContext.provider != "aws" { + By(fmt.Sprintf("Skipping PD test, which is only supported for providers gce & aws (not %s)", + testContext.provider)) return } + By("creating PD") + diskName, err := createPD() + expectNoError(err, "Error creating PD") + host0Pod := testPDPod(diskName, host0Name, false) host1Pod := testPDPod(diskName, host1Name, false) - By(fmt.Sprintf("creating PD %q", diskName)) - expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD") - defer func() { By("cleaning up PD-RW test environment") // Teardown pods, PD. Ignore errors. // Teardown should do nothing unless test failed. podClient.Delete(host0Pod.Name) podClient.Delete(host1Pod.Name) - detachPD(host0Name, diskName, testContext.GCEConfig.Zone) - detachPD(host1Name, diskName, testContext.GCEConfig.Zone) - deletePD(diskName, testContext.GCEConfig.Zone) + detachPD(host0Name, diskName) + detachPD(host1Name, diskName) + deletePD(diskName) }() By("submitting host0Pod to kubernetes") - _, err := podClient.Create(host0Pod) + _, err = podClient.Create(host0Pod) expectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err)) expectNoError(waitForPodRunning(c, host0Pod.Name)) @@ -100,7 +100,7 @@ var _ = Describe("PD", func() { By(fmt.Sprintf("deleting PD %q", diskName)) for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { - if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil { + if err = deletePD(diskName); err != nil { Logf("Couldn't delete PD. Sleeping 5 seconds") continue } @@ -118,6 +118,10 @@ var _ = Describe("PD", func() { return } + By("creating PD") + diskName, err := createPD() + expectNoError(err, "Error creating PD") + rwPod := testPDPod(diskName, host0Name, false) host0ROPod := testPDPod(diskName, host0Name, true) host1ROPod := testPDPod(diskName, host1Name, true) @@ -129,16 +133,14 @@ var _ = Describe("PD", func() { podClient.Delete(rwPod.Name) podClient.Delete(host0ROPod.Name) podClient.Delete(host1ROPod.Name) - detachPD(host0Name, diskName, testContext.GCEConfig.Zone) - detachPD(host1Name, diskName, testContext.GCEConfig.Zone) - deletePD(diskName, testContext.GCEConfig.Zone) + + detachPD(host0Name, diskName) + detachPD(host1Name, diskName) + deletePD(diskName) }() - By(fmt.Sprintf("creating PD %q", diskName)) - expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD") - By("submitting rwPod to ensure PD is formatted") - _, err := podClient.Create(rwPod) + _, err = podClient.Create(rwPod) expectNoError(err, "Failed to create rwPod") expectNoError(waitForPodRunning(c, rwPod.Name)) expectNoError(podClient.Delete(rwPod.Name), "Failed to delete host0Pod") @@ -163,7 +165,7 @@ var _ = Describe("PD", func() { By(fmt.Sprintf("deleting PD %q", diskName)) for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { - if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil { + if err = deletePD(diskName); err != nil { Logf("Couldn't delete PD. Sleeping 5 seconds") continue } @@ -173,24 +175,70 @@ var _ = Describe("PD", func() { }) }) -func createPD(pdName, zone string) error { - // TODO: make this hit the compute API directly instread of shelling out to gcloud. - return exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run() +func createPD() (string, error) { + if testContext.provider == "gce" { + pdName := fmt.Sprintf("e2e-%s", string(util.NewUUID())) + + zone := testContext.cloudConfig.Zone + // TODO: make this hit the compute API directly instread of shelling out to gcloud. + err := exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run() + if err != nil { + return "", err + } + return pdName, nil + } else if testContext.provider == "aws" { + volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes) + if !ok { + return "", fmt.Errorf("Provider does not implement volumes interface") + } + volumeOptions := &aws_cloud.VolumeOptions{} + volumeOptions.CapacityMB = 10 * 1024 + volumeOptions.Zone = testContext.cloudConfig.Zone + return volumes.CreateVolume(volumeOptions) + } else { + return "", fmt.Errorf("Unsupported provider type") + } } -func deletePD(pdName, zone string) error { - // TODO: make this hit the compute API directly. - return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run() +func deletePD(pdName string) error { + if testContext.provider == "gce" { + zone := testContext.cloudConfig.Zone + + // TODO: make this hit the compute API directly. + return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run() + } else if testContext.provider == "aws" { + volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes) + if !ok { + return fmt.Errorf("Provider does not implement volumes interface") + } + return volumes.DeleteVolume(pdName) + } else { + return fmt.Errorf("Unsupported provider type") + } } -func detachPD(hostName, pdName, zone string) error { - instanceName := strings.Split(hostName, ".")[0] - // TODO: make this hit the compute API directly. - return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run() +func detachPD(hostName, pdName string) error { + if testContext.provider == "gce" { + instanceName := strings.Split(hostName, ".")[0] + + zone := testContext.cloudConfig.Zone + + // TODO: make this hit the compute API directly. + return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run() + } else if testContext.provider == "aws" { + volumes, ok := testContext.cloudConfig.Provider.(aws_cloud.Volumes) + if !ok { + return fmt.Errorf("Provider does not implement volumes interface") + } + return volumes.DetachDisk(hostName, pdName) + } else { + return fmt.Errorf("Unsupported provider type") + } + } func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod { - return &api.Pod{ + pod := &api.Pod{ TypeMeta: api.TypeMeta{ Kind: "Pod", APIVersion: "v1beta1", @@ -199,18 +247,6 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod { Name: "pd-test-" + string(util.NewUUID()), }, Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "testpd", - VolumeSource: api.VolumeSource{ - GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ - PDName: diskName, - FSType: "ext4", - ReadOnly: readOnly, - }, - }, - }, - }, Containers: []api.Container{ { Name: "testpd", @@ -226,4 +262,36 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod { Host: targetHost, }, } + + if testContext.provider == "gce" { + pod.Spec.Volumes = []api.Volume{ + { + Name: "testpd", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: diskName, + FSType: "ext4", + ReadOnly: readOnly, + }, + }, + }, + } + } else if testContext.provider == "aws" { + pod.Spec.Volumes = []api.Volume{ + { + Name: "testpd", + VolumeSource: api.VolumeSource{ + AWSPersistentDisk: &api.AWSPersistentDiskVolumeSource{ + PDName: diskName, + FSType: "ext4", + ReadOnly: readOnly, + }, + }, + }, + } + } else { + panic("Unknown provider: " + testContext.provider) + } + + return pod }