Merge pull request #99664 from AndyXiangLi/aws-delete-on-creation-fail

delete leaked volume if driver don't know the volume status -- aws
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 15:59:01 -07:00 committed by GitHub
commit b0fb5264e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 4 deletions

View File

@ -2555,7 +2555,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er
createType = DefaultVolumeType
default:
return "", fmt.Errorf("invalid AWS VolumeType %q", volumeOptions.VolumeType)
return KubernetesVolumeID(""), fmt.Errorf("invalid AWS VolumeType %q", volumeOptions.VolumeType)
}
request := &ec2.CreateVolumeInput{}
@ -2587,12 +2587,12 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er
response, err := c.ec2.CreateVolume(request)
if err != nil {
return "", err
return KubernetesVolumeID(""), err
}
awsID := EBSVolumeID(aws.StringValue(response.VolumeId))
if awsID == "" {
return "", fmt.Errorf("VolumeID was not returned by CreateVolume")
return KubernetesVolumeID(""), fmt.Errorf("VolumeID was not returned by CreateVolume")
}
volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID))
@ -2605,8 +2605,22 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er
// because Kubernetes may have limited permissions to the key.
if isAWSErrorVolumeNotFound(err) {
err = fmt.Errorf("failed to create encrypted volume: the volume disappeared after creation, most likely due to inaccessible KMS encryption key")
} else {
// When DescribeVolumes api failed, plugin will lose track on the volumes' state
// driver should be able to clean up these kind of volumes to make sure they are not leaked on customers' account
klog.V(5).Infof("Failed to create the volume %v due to %v. Will try to delete it.", volumeName, err)
awsDisk, newDiskError := newAWSDisk(c, volumeName)
if newDiskError != nil {
klog.Errorf("Failed to delete the volume %v due to error: %v", volumeName, newDiskError)
} else {
if _, deleteVolumeError := awsDisk.deleteVolume(); deleteVolumeError != nil {
klog.Errorf("Failed to delete the volume %v due to error: %v", volumeName, deleteVolumeError)
} else {
klog.V(5).Infof("%v is deleted because it is not in desired state after waiting", volumeName)
}
}
}
return "", err
return KubernetesVolumeID(""), err
}
return volumeName, nil

View File

@ -73,6 +73,11 @@ func (m *MockedFakeEC2) DescribeVolumes(request *ec2.DescribeVolumesInput) ([]*e
return args.Get(0).([]*ec2.Volume), nil
}
func (m *MockedFakeEC2) DeleteVolume(request *ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error) {
args := m.Called(request)
return args.Get(0).(*ec2.DeleteVolumeOutput), nil
}
func (m *MockedFakeEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) {
args := m.Called(request)
return args.Get(0).([]*ec2.SecurityGroup), nil
@ -2363,6 +2368,51 @@ func TestCreateDisk(t *testing.T) {
awsServices.ec2.(*MockedFakeEC2).AssertExpectations(t)
}
func TestCreateDiskFailDescribeVolume(t *testing.T) {
awsServices := newMockedFakeAWSServices(TestClusterID)
c, _ := newAWSCloud(CloudConfig{}, awsServices)
volumeOptions := &VolumeOptions{
AvailabilityZone: "us-east-1a",
CapacityGB: 10,
}
request := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String("us-east-1a"),
Encrypted: aws.Bool(false),
VolumeType: aws.String(DefaultVolumeType),
Size: aws.Int64(10),
TagSpecifications: []*ec2.TagSpecification{
{ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: []*ec2.Tag{
// CreateVolume from MockedFakeEC2 expects sorted tags, so we need to
// always have these tags sorted:
{Key: aws.String(TagNameKubernetesClusterLegacy), Value: aws.String(TestClusterID)},
{Key: aws.String(fmt.Sprintf("%s%s", TagNameKubernetesClusterPrefix, TestClusterID)), Value: aws.String(ResourceLifecycleOwned)},
}},
},
}
volume := &ec2.Volume{
AvailabilityZone: aws.String("us-east-1a"),
VolumeId: aws.String("vol-volumeId0"),
State: aws.String("creating"),
}
awsServices.ec2.(*MockedFakeEC2).On("CreateVolume", request).Return(volume, nil)
describeVolumesRequest := &ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String("vol-volumeId0")},
}
deleteVolumeRequest := &ec2.DeleteVolumeInput{
VolumeId: aws.String("vol-volumeId0"),
}
awsServices.ec2.(*MockedFakeEC2).On("DescribeVolumes", describeVolumesRequest).Return([]*ec2.Volume{volume}, nil)
awsServices.ec2.(*MockedFakeEC2).On("DeleteVolume", deleteVolumeRequest).Return(&ec2.DeleteVolumeOutput{}, nil)
volumeID, err := c.CreateDisk(volumeOptions)
assert.Error(t, err)
assert.Equal(t, volumeID, KubernetesVolumeID(""))
awsServices.ec2.(*MockedFakeEC2).AssertExpectations(t)
}
func TestRegionIsValid(t *testing.T) {
fake := newMockedFakeAWSServices("fakeCluster")
fake.selfInstance.Placement = &ec2.Placement{