From 1f156016e663446e81b9e637537cd771b69d74c9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 15 Mar 2021 10:59:32 -0700 Subject: [PATCH] delete leaked volume if driver don't know the volume status -- aws --- .../k8s.io/legacy-cloud-providers/aws/aws.go | 22 ++++++-- .../legacy-cloud-providers/aws/aws_test.go | 50 +++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go index 3588f38cb4e..0ba278fcefe 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go @@ -2555,7 +2555,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er createType = DefaultVolumeType default: - return "", fmt.Errorf("invalid AWS VolumeType %q", volumeOptions.VolumeType) + return KubernetesVolumeID(""), fmt.Errorf("invalid AWS VolumeType %q", volumeOptions.VolumeType) } request := &ec2.CreateVolumeInput{} @@ -2587,12 +2587,12 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er response, err := c.ec2.CreateVolume(request) if err != nil { - return "", err + return KubernetesVolumeID(""), err } awsID := EBSVolumeID(aws.StringValue(response.VolumeId)) if awsID == "" { - return "", fmt.Errorf("VolumeID was not returned by CreateVolume") + return KubernetesVolumeID(""), fmt.Errorf("VolumeID was not returned by CreateVolume") } volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID)) @@ -2605,8 +2605,22 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er // because Kubernetes may have limited permissions to the key. if isAWSErrorVolumeNotFound(err) { err = fmt.Errorf("failed to create encrypted volume: the volume disappeared after creation, most likely due to inaccessible KMS encryption key") + } else { + // When DescribeVolumes api failed, plugin will lose track on the volumes' state + // driver should be able to clean up these kind of volumes to make sure they are not leaked on customers' account + klog.V(5).Infof("Failed to create the volume %v due to %v. Will try to delete it.", volumeName, err) + awsDisk, newDiskError := newAWSDisk(c, volumeName) + if newDiskError != nil { + klog.Errorf("Failed to delete the volume %v due to error: %v", volumeName, newDiskError) + } else { + if _, deleteVolumeError := awsDisk.deleteVolume(); deleteVolumeError != nil { + klog.Errorf("Failed to delete the volume %v due to error: %v", volumeName, deleteVolumeError) + } else { + klog.V(5).Infof("%v is deleted because it is not in desired state after waiting", volumeName) + } + } } - return "", err + return KubernetesVolumeID(""), err } return volumeName, nil diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go index a6cd5d29a6c..d168eb8467e 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go @@ -73,6 +73,11 @@ func (m *MockedFakeEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*e return args.Get(0).([]*ec2.Volume), nil } +func (m *MockedFakeEC2) DeleteVolume(request *ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error) { + args := m.Called(request) + return args.Get(0).(*ec2.DeleteVolumeOutput), nil +} + func (m *MockedFakeEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { args := m.Called(request) return args.Get(0).([]*ec2.SecurityGroup), nil @@ -2363,6 +2368,51 @@ func TestCreateDisk(t *testing.T) { awsServices.ec2.(*MockedFakeEC2).AssertExpectations(t) } +func TestCreateDiskFailDescribeVolume(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + volumeOptions := &VolumeOptions{ + AvailabilityZone: "us-east-1a", + CapacityGB: 10, + } + request := &ec2.CreateVolumeInput{ + AvailabilityZone: aws.String("us-east-1a"), + Encrypted: aws.Bool(false), + VolumeType: aws.String(DefaultVolumeType), + Size: aws.Int64(10), + TagSpecifications: []*ec2.TagSpecification{ + {ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: []*ec2.Tag{ + // CreateVolume from MockedFakeEC2 expects sorted tags, so we need to + // always have these tags sorted: + {Key: aws.String(TagNameKubernetesClusterLegacy), Value: aws.String(TestClusterID)}, + {Key: aws.String(fmt.Sprintf("%s%s", TagNameKubernetesClusterPrefix, TestClusterID)), Value: aws.String(ResourceLifecycleOwned)}, + }}, + }, + } + + volume := &ec2.Volume{ + AvailabilityZone: aws.String("us-east-1a"), + VolumeId: aws.String("vol-volumeId0"), + State: aws.String("creating"), + } + awsServices.ec2.(*MockedFakeEC2).On("CreateVolume", request).Return(volume, nil) + + describeVolumesRequest := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{aws.String("vol-volumeId0")}, + } + deleteVolumeRequest := &ec2.DeleteVolumeInput{ + VolumeId: aws.String("vol-volumeId0"), + } + awsServices.ec2.(*MockedFakeEC2).On("DescribeVolumes", describeVolumesRequest).Return([]*ec2.Volume{volume}, nil) + awsServices.ec2.(*MockedFakeEC2).On("DeleteVolume", deleteVolumeRequest).Return(&ec2.DeleteVolumeOutput{}, nil) + + volumeID, err := c.CreateDisk(volumeOptions) + assert.Error(t, err) + assert.Equal(t, volumeID, KubernetesVolumeID("")) + awsServices.ec2.(*MockedFakeEC2).AssertExpectations(t) +} + func TestRegionIsValid(t *testing.T) { fake := newMockedFakeAWSServices("fakeCluster") fake.selfInstance.Placement = &ec2.Placement{