From 8c49d1db022f2feaf14e3dc255ba0e9f418aa47e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 20 Nov 2017 17:38:09 -0500 Subject: [PATCH] Implement disk resizing for AWS Update bazel files --- pkg/cloudprovider/providers/aws/BUILD | 1 + pkg/cloudprovider/providers/aws/aws.go | 128 ++++++++++++++++++ pkg/cloudprovider/providers/aws/aws_fakes.go | 8 ++ pkg/volume/aws_ebs/BUILD | 1 + pkg/volume/aws_ebs/attacher_test.go | 8 ++ pkg/volume/aws_ebs/aws_ebs.go | 27 ++++ .../admission/persistentvolume/label/BUILD | 1 + .../persistentvolume/label/admission_test.go | 8 ++ .../persistentvolume/resize/admission.go | 6 +- 9 files changed, 187 insertions(+), 1 deletion(-) diff --git a/pkg/cloudprovider/providers/aws/BUILD b/pkg/cloudprovider/providers/aws/BUILD index 5936b00945d..9a4a6a76539 100644 --- a/pkg/cloudprovider/providers/aws/BUILD +++ b/pkg/cloudprovider/providers/aws/BUILD @@ -50,6 +50,7 @@ go_library( "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/gopkg.in/gcfg.v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 292210bacb1..c25b0f58ea2 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -48,6 +48,7 @@ import ( "path" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -285,6 +286,10 @@ type EC2 interface { // Delete an EBS volume DeleteVolume(*ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error) + ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) + + DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) + DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error) @@ -453,6 +458,9 @@ type Volumes interface { // Check if disks specified in argument map are still attached to their respective nodes. DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) + + // Expand the disk to new size + ResizeDisk(diskName KubernetesVolumeID, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error) } // InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups @@ -833,6 +841,36 @@ func (s *awsSdkEC2) DeleteVolume(request *ec2.DeleteVolumeInput) (*ec2.DeleteVol return resp, err } +func (s *awsSdkEC2) ModifyVolume(request *ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) { + requestTime := time.Now() + resp, err := s.ec2.ModifyVolume(request) + timeTaken := time.Since(requestTime).Seconds() + recordAwsMetric("modify_volume", timeTaken, err) + return resp, err +} + +func (s *awsSdkEC2) DescribeVolumeModifications(request *ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) { + requestTime := time.Now() + results := []*ec2.VolumeModification{} + var nextToken *string + for { + resp, err := s.ec2.DescribeVolumesModifications(request) + if err != nil { + recordAwsMetric("describe_volume_modification", 0, err) + return nil, fmt.Errorf("error listing volume modifictions : %v", err) + } + results = append(results, resp.VolumesModifications...) + nextToken = resp.NextToken + if aws.StringValue(nextToken) == "" { + break + } + request.NextToken = nextToken + } + timeTaken := time.Since(requestTime).Seconds() + recordAwsMetric("describe_volume_modification", timeTaken, nil) + return results, nil +} + func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) { // Subnets are not paged response, err := s.ec2.DescribeSubnets(request) @@ -1653,6 +1691,65 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) { return volumes[0], nil } +func (d *awsDisk) describeVolumeModification() (*ec2.VolumeModification, error) { + volumeID := d.awsID + request := &ec2.DescribeVolumesModificationsInput{ + VolumeIds: []*string{volumeID.awsString()}, + } + volumeMods, err := d.ec2.DescribeVolumeModifications(request) + + if err != nil { + return nil, fmt.Errorf("error describing volume modification %s with %v", volumeID, err) + } + + if len(volumeMods) == 0 { + return nil, fmt.Errorf("no volume modifications found for %s", volumeID) + } + lastIndex := len(volumeMods) - 1 + return volumeMods[lastIndex], nil +} + +func (d *awsDisk) modifyVolume(requestGiB int64) (int64, error) { + volumeID := d.awsID + + request := &ec2.ModifyVolumeInput{ + VolumeId: volumeID.awsString(), + Size: aws.Int64(requestGiB), + } + output, err := d.ec2.ModifyVolume(request) + if err != nil { + modifyError := fmt.Errorf("AWS modifyVolume failed for %s with %v", volumeID, err) + return requestGiB, modifyError + } + + volumeModification := output.VolumeModification + + if aws.StringValue(volumeModification.ModificationState) == ec2.VolumeModificationStateCompleted { + return aws.Int64Value(volumeModification.TargetSize), nil + } + + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2, + Steps: 10, + } + + checkForResize := func() (bool, error) { + volumeModification, err := d.describeVolumeModification() + + if err != nil { + return false, err + } + + if aws.StringValue(volumeModification.ModificationState) == ec2.VolumeModificationStateCompleted { + return true, nil + } + return false, nil + } + waitWithErr := wait.ExponentialBackoff(backoff, checkForResize) + return requestGiB, waitWithErr +} + // applyUnSchedulableTaint applies a unschedulable taint to a node after verifying // if node has become unusable because of volumes getting stuck in attaching state. func (c *Cloud) applyUnSchedulableTaint(nodeName types.NodeName, reason string) { @@ -2321,6 +2418,37 @@ func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolume return attached, nil } +func (c *Cloud) ResizeDisk( + diskName KubernetesVolumeID, + oldSize resource.Quantity, + newSize resource.Quantity) (resource.Quantity, error) { + awsDisk, err := newAWSDisk(c, diskName) + if err != nil { + return oldSize, err + } + + volumeInfo, err := awsDisk.describeVolume() + if err != nil { + descErr := fmt.Errorf("AWS.ResizeDisk Error describing volume %s with %v", diskName, err) + return oldSize, descErr + } + requestBytes := newSize.Value() + // AWS resizes in chunks of GiB (not GB) + requestGiB := volume.RoundUpSize(requestBytes, 1024*1024*1024) + newSizeQuant := resource.MustParse(fmt.Sprintf("%dGi", requestGiB)) + + // If disk already if of greater or equal size than requested we return + if aws.Int64Value(volumeInfo.Size) >= requestGiB { + return newSizeQuant, nil + } + _, err = awsDisk.modifyVolume(requestGiB) + + if err != nil { + return oldSize, err + } + return newSizeQuant, nil +} + // Gets the current load balancer state func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) { request := &elb.DescribeLoadBalancersInput{} diff --git a/pkg/cloudprovider/providers/aws/aws_fakes.go b/pkg/cloudprovider/providers/aws/aws_fakes.go index 036968673e7..77436327230 100644 --- a/pkg/cloudprovider/providers/aws/aws_fakes.go +++ b/pkg/cloudprovider/providers/aws/aws_fakes.go @@ -203,6 +203,14 @@ func (ec2i *FakeEC2Impl) RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngr panic("Not implemented") } +func (ec2i *FakeEC2Impl) DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) { + panic("Not implemented") +} + +func (ec2i *FakeEC2Impl) ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) { + panic("Not implemented") +} + func (ec2i *FakeEC2Impl) CreateSubnet(request *ec2.Subnet) (*ec2.CreateSubnetOutput, error) { ec2i.Subnets = append(ec2i.Subnets, request) response := &ec2.CreateSubnetOutput{ diff --git a/pkg/volume/aws_ebs/BUILD b/pkg/volume/aws_ebs/BUILD index 5d45520f468..e61d65a9da2 100644 --- a/pkg/volume/aws_ebs/BUILD +++ b/pkg/volume/aws_ebs/BUILD @@ -46,6 +46,7 @@ go_test( "//pkg/volume/testing:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index 16df5812578..813139e5b95 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -26,6 +26,7 @@ import ( volumetest "k8s.io/kubernetes/pkg/volume/testing" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" ) @@ -340,3 +341,10 @@ func (testcase *testcase) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (ma func (testcase *testcase) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) { return "", errors.New("Not implemented") } + +func (testcase *testcase) ResizeDisk( + volumeName aws.KubernetesVolumeID, + oldSize resource.Quantity, + newSize resource.Quantity) (resource.Quantity, error) { + return oldSize, errors.New("Not implemented") +} diff --git a/pkg/volume/aws_ebs/aws_ebs.go b/pkg/volume/aws_ebs/aws_ebs.go index 5c9f2702c00..9a0091928e2 100644 --- a/pkg/volume/aws_ebs/aws_ebs.go +++ b/pkg/volume/aws_ebs/aws_ebs.go @@ -241,6 +241,33 @@ func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath return volume.NewSpecFromVolume(awsVolume), nil } +func (plugin *awsElasticBlockStorePlugin) RequiresFSResize() bool { + return true +} + +func (plugin *awsElasticBlockStorePlugin) ExpandVolumeDevice( + spec *volume.Spec, + newSize resource.Quantity, + oldSize resource.Quantity) (resource.Quantity, error) { + var awsVolume aws.Volumes + + awsVolume, err := getCloudProvider(plugin.host.GetCloudProvider()) + + if err != nil { + return oldSize, err + } + // we don't expect to receive this call for non PVs + rawVolumeName := spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID + volumeID := aws.KubernetesVolumeID(rawVolumeName) + + if volumeID == "" { + return oldSize, fmt.Errorf("EBS.ExpandVolumeDevice Invalid volume id for %s", spec.Name()) + } + return awsVolume.ResizeDisk(volumeID, oldSize, newSize) +} + +var _ volume.ExpandableVolumePlugin = &awsElasticBlockStorePlugin{} + // Abstract interface to PD operations. type ebsManager interface { CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error) diff --git a/plugin/pkg/admission/persistentvolume/label/BUILD b/plugin/pkg/admission/persistentvolume/label/BUILD index 0c04cbc99d2..b88f19f863e 100644 --- a/plugin/pkg/admission/persistentvolume/label/BUILD +++ b/plugin/pkg/admission/persistentvolume/label/BUILD @@ -34,6 +34,7 @@ go_test( deps = [ "//pkg/apis/core:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", diff --git a/plugin/pkg/admission/persistentvolume/label/admission_test.go b/plugin/pkg/admission/persistentvolume/label/admission_test.go index d13dcc30384..e349c756a4d 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/persistentvolume/label/admission_test.go @@ -21,6 +21,7 @@ import ( "fmt" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/admission" @@ -67,6 +68,13 @@ func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.Kubern return nil, fmt.Errorf("not implemented") } +func (c *mockVolumes) ResizeDisk( + diskName aws.KubernetesVolumeID, + oldSize resource.Quantity, + newSize resource.Quantity) (resource.Quantity, error) { + return oldSize, nil +} + func mockVolumeFailure(err error) *mockVolumes { return &mockVolumes{volumeLabelsError: err} } diff --git a/plugin/pkg/admission/persistentvolume/resize/admission.go b/plugin/pkg/admission/persistentvolume/resize/admission.go index b550edb5ff4..3c98ad78ed1 100644 --- a/plugin/pkg/admission/persistentvolume/resize/admission.go +++ b/plugin/pkg/admission/persistentvolume/resize/admission.go @@ -159,6 +159,10 @@ func (pvcr *persistentVolumeClaimResize) checkVolumePlugin(pv *api.PersistentVol if pv.Spec.GCEPersistentDisk != nil { return true } - return false + if pv.Spec.AWSElasticBlockStore != nil { + return true + } + + return false }