Merge pull request #56118 from gnufied/implement-ebs-resize

Automatic merge from submit-queue (batch tested with PRs 56249, 56118, 56255, 56252, 56256). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Implement ebs resize

Implement EBS disk resizing. 

xref - kubernetes/features#284


```release-note
Add support for resizing EBS disks
```
This commit is contained in:
Kubernetes Submit Queue 2017-11-23 00:23:21 -08:00 committed by GitHub
commit d42af03d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 187 additions and 1 deletions

View File

@ -50,6 +50,7 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",

View File

@ -48,6 +48,7 @@ import (
"path"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@ -285,6 +286,10 @@ type EC2 interface {
// Delete an EBS volume
DeleteVolume(*ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error)
ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error)
DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error)
DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error)
CreateSecurityGroup(*ec2.CreateSecurityGroupInput) (*ec2.CreateSecurityGroupOutput, error)
@ -453,6 +458,9 @@ type Volumes interface {
// Check if disks specified in argument map are still attached to their respective nodes.
DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error)
// Expand the disk to new size
ResizeDisk(diskName KubernetesVolumeID, oldSize resource.Quantity, newSize resource.Quantity) (resource.Quantity, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@ -833,6 +841,36 @@ func (s *awsSdkEC2) DeleteVolume(request *ec2.DeleteVolumeInput) (*ec2.DeleteVol
return resp, err
}
func (s *awsSdkEC2) ModifyVolume(request *ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) {
requestTime := time.Now()
resp, err := s.ec2.ModifyVolume(request)
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("modify_volume", timeTaken, err)
return resp, err
}
func (s *awsSdkEC2) DescribeVolumeModifications(request *ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) {
requestTime := time.Now()
results := []*ec2.VolumeModification{}
var nextToken *string
for {
resp, err := s.ec2.DescribeVolumesModifications(request)
if err != nil {
recordAwsMetric("describe_volume_modification", 0, err)
return nil, fmt.Errorf("error listing volume modifictions : %v", err)
}
results = append(results, resp.VolumesModifications...)
nextToken = resp.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}
timeTaken := time.Since(requestTime).Seconds()
recordAwsMetric("describe_volume_modification", timeTaken, nil)
return results, nil
}
func (s *awsSdkEC2) DescribeSubnets(request *ec2.DescribeSubnetsInput) ([]*ec2.Subnet, error) {
// Subnets are not paged
response, err := s.ec2.DescribeSubnets(request)
@ -1653,6 +1691,65 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
return volumes[0], nil
}
func (d *awsDisk) describeVolumeModification() (*ec2.VolumeModification, error) {
volumeID := d.awsID
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: []*string{volumeID.awsString()},
}
volumeMods, err := d.ec2.DescribeVolumeModifications(request)
if err != nil {
return nil, fmt.Errorf("error describing volume modification %s with %v", volumeID, err)
}
if len(volumeMods) == 0 {
return nil, fmt.Errorf("no volume modifications found for %s", volumeID)
}
lastIndex := len(volumeMods) - 1
return volumeMods[lastIndex], nil
}
func (d *awsDisk) modifyVolume(requestGiB int64) (int64, error) {
volumeID := d.awsID
request := &ec2.ModifyVolumeInput{
VolumeId: volumeID.awsString(),
Size: aws.Int64(requestGiB),
}
output, err := d.ec2.ModifyVolume(request)
if err != nil {
modifyError := fmt.Errorf("AWS modifyVolume failed for %s with %v", volumeID, err)
return requestGiB, modifyError
}
volumeModification := output.VolumeModification
if aws.StringValue(volumeModification.ModificationState) == ec2.VolumeModificationStateCompleted {
return aws.Int64Value(volumeModification.TargetSize), nil
}
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 2,
Steps: 10,
}
checkForResize := func() (bool, error) {
volumeModification, err := d.describeVolumeModification()
if err != nil {
return false, err
}
if aws.StringValue(volumeModification.ModificationState) == ec2.VolumeModificationStateCompleted {
return true, nil
}
return false, nil
}
waitWithErr := wait.ExponentialBackoff(backoff, checkForResize)
return requestGiB, waitWithErr
}
// applyUnSchedulableTaint applies a unschedulable taint to a node after verifying
// if node has become unusable because of volumes getting stuck in attaching state.
func (c *Cloud) applyUnSchedulableTaint(nodeName types.NodeName, reason string) {
@ -2321,6 +2418,37 @@ func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolume
return attached, nil
}
func (c *Cloud) ResizeDisk(
diskName KubernetesVolumeID,
oldSize resource.Quantity,
newSize resource.Quantity) (resource.Quantity, error) {
awsDisk, err := newAWSDisk(c, diskName)
if err != nil {
return oldSize, err
}
volumeInfo, err := awsDisk.describeVolume()
if err != nil {
descErr := fmt.Errorf("AWS.ResizeDisk Error describing volume %s with %v", diskName, err)
return oldSize, descErr
}
requestBytes := newSize.Value()
// AWS resizes in chunks of GiB (not GB)
requestGiB := volume.RoundUpSize(requestBytes, 1024*1024*1024)
newSizeQuant := resource.MustParse(fmt.Sprintf("%dGi", requestGiB))
// If disk already if of greater or equal size than requested we return
if aws.Int64Value(volumeInfo.Size) >= requestGiB {
return newSizeQuant, nil
}
_, err = awsDisk.modifyVolume(requestGiB)
if err != nil {
return oldSize, err
}
return newSizeQuant, nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}

View File

@ -203,6 +203,14 @@ func (ec2i *FakeEC2Impl) RevokeSecurityGroupIngress(*ec2.RevokeSecurityGroupIngr
panic("Not implemented")
}
func (ec2i *FakeEC2Impl) DescribeVolumeModifications(*ec2.DescribeVolumesModificationsInput) ([]*ec2.VolumeModification, error) {
panic("Not implemented")
}
func (ec2i *FakeEC2Impl) ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) {
panic("Not implemented")
}
func (ec2i *FakeEC2Impl) CreateSubnet(request *ec2.Subnet) (*ec2.CreateSubnetOutput, error) {
ec2i.Subnets = append(ec2i.Subnets, request)
response := &ec2.CreateSubnetOutput{

View File

@ -46,6 +46,7 @@ go_test(
"//pkg/volume/testing:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",

View File

@ -26,6 +26,7 @@ import (
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
)
@ -340,3 +341,10 @@ func (testcase *testcase) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (ma
func (testcase *testcase) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) {
return "", errors.New("Not implemented")
}
func (testcase *testcase) ResizeDisk(
volumeName aws.KubernetesVolumeID,
oldSize resource.Quantity,
newSize resource.Quantity) (resource.Quantity, error) {
return oldSize, errors.New("Not implemented")
}

View File

@ -241,6 +241,33 @@ func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath
return volume.NewSpecFromVolume(awsVolume), nil
}
func (plugin *awsElasticBlockStorePlugin) RequiresFSResize() bool {
return true
}
func (plugin *awsElasticBlockStorePlugin) ExpandVolumeDevice(
spec *volume.Spec,
newSize resource.Quantity,
oldSize resource.Quantity) (resource.Quantity, error) {
var awsVolume aws.Volumes
awsVolume, err := getCloudProvider(plugin.host.GetCloudProvider())
if err != nil {
return oldSize, err
}
// we don't expect to receive this call for non PVs
rawVolumeName := spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID
volumeID := aws.KubernetesVolumeID(rawVolumeName)
if volumeID == "" {
return oldSize, fmt.Errorf("EBS.ExpandVolumeDevice Invalid volume id for %s", spec.Name())
}
return awsVolume.ResizeDisk(volumeID, oldSize, newSize)
}
var _ volume.ExpandableVolumePlugin = &awsElasticBlockStorePlugin{}
// Abstract interface to PD operations.
type ebsManager interface {
CreateVolume(provisioner *awsElasticBlockStoreProvisioner) (volumeID aws.KubernetesVolumeID, volumeSizeGB int, labels map[string]string, fstype string, err error)

View File

@ -34,6 +34,7 @@ go_test(
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
@ -67,6 +68,13 @@ func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.Kubern
return nil, fmt.Errorf("not implemented")
}
func (c *mockVolumes) ResizeDisk(
diskName aws.KubernetesVolumeID,
oldSize resource.Quantity,
newSize resource.Quantity) (resource.Quantity, error) {
return oldSize, nil
}
func mockVolumeFailure(err error) *mockVolumes {
return &mockVolumes{volumeLabelsError: err}
}

View File

@ -156,6 +156,10 @@ func (pvcr *persistentVolumeClaimResize) checkVolumePlugin(pv *api.PersistentVol
if pv.Spec.GCEPersistentDisk != nil {
return true
}
return false
if pv.Spec.AWSElasticBlockStore != nil {
return true
}
return false
}