Merge pull request #93567 from gnufied/fix-stale-attachments

Make AttachDisk for EBS idempotent again
This commit is contained in:
Kubernetes Prow Robot 2020-08-07 07:16:19 -07:00 committed by GitHub
commit a640545167
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 353 additions and 8 deletions

View File

@ -257,6 +257,14 @@ const (
filterNodeLimit = 150
)
const (
// represents expected attachment status of a volume after attach
volumeAttachedStatus = "attached"
// represents expected attachment status of a volume after detach
volumeDetachedStatus = "detached"
)
// awsTagNameMasterRoles is a set of well-known AWS tag names that indicate the instance is a master
// The major consequence is that it is then not considered for AWS zone discovery for dynamic volume creation.
var awsTagNameMasterRoles = sets.NewString("kubernetes.io/role/master", "k8s.io/role/master")
@ -1943,7 +1951,6 @@ func (c *Cloud) getMountDevice(
// AWS API returns consistent result next time (i.e. the volume is detached).
status := volumeStatus[mappingVolumeID]
klog.Warningf("Got assignment call for already-assigned volume: %s@%s, volume status: %s", mountDevice, mappingVolumeID, status)
return mountDevice, false, fmt.Errorf("volume is still being detached from the node")
}
return mountDevice, true, nil
}
@ -2144,7 +2151,7 @@ func (c *Cloud) applyUnSchedulableTaint(nodeName types.NodeName, reason string)
// waitForAttachmentStatus polls until the attachment status is the expected value
// On success, it returns the last attachment state.
func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expectedDevice string) (*ec2.VolumeAttachment, error) {
func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expectedDevice string, alreadyAttached bool) (*ec2.VolumeAttachment, error) {
backoff := wait.Backoff{
Duration: volumeAttachmentStatusPollDelay,
Factor: volumeAttachmentStatusFactor,
@ -2169,7 +2176,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expec
if err != nil {
// The VolumeNotFound error is special -- we don't need to wait for it to repeat
if isAWSErrorVolumeNotFound(err) {
if status == "detached" {
if status == volumeDetachedStatus {
// The disk doesn't exist, assume it's detached, log warning and stop waiting
klog.Warningf("Waiting for volume %q to be detached but the volume does not exist", d.awsID)
stateStr := "detached"
@ -2178,7 +2185,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expec
}
return true, nil
}
if status == "attached" {
if status == volumeAttachedStatus {
// The disk doesn't exist, complain, give up waiting and report error
klog.Warningf("Waiting for volume %q to be attached but the volume does not exist", d.awsID)
return false, err
@ -2213,7 +2220,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expec
}
}
if attachmentStatus == "" {
attachmentStatus = "detached"
attachmentStatus = volumeDetachedStatus
}
if attachment != nil {
// AWS eventual consistency can go back in time.
@ -2242,6 +2249,13 @@ func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expec
}
}
// if we expected volume to be attached and it was reported as already attached via DescribeInstance call
// but DescribeVolume told us volume is detached, we will short-circuit this long wait loop and return error
// so as AttachDisk can be retried without waiting for 20 minutes.
if (status == volumeAttachedStatus) && alreadyAttached && (attachmentStatus != status) {
return false, fmt.Errorf("attachment of disk %q failed, expected device to be attached but was %s", d.name, attachmentStatus)
}
if attachmentStatus == status {
// Attachment is in requested state, finish waiting
return true, nil
@ -2387,7 +2401,7 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
klog.V(2).Infof("AttachVolume volume=%q instance=%q request returned %v", disk.awsID, awsInstance.awsID, attachResponse)
}
attachment, err := disk.waitForAttachmentStatus("attached", awsInstance.awsID, ec2Device)
attachment, err := disk.waitForAttachmentStatus("attached", awsInstance.awsID, ec2Device, alreadyAttached)
if err != nil {
if err == wait.ErrWaitTimeout {
@ -2465,7 +2479,7 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
return "", errors.New("no response from DetachVolume")
}
attachment, err := diskInfo.disk.waitForAttachmentStatus("detached", awsInstance.awsID, "")
attachment, err := diskInfo.disk.waitForAttachmentStatus("detached", awsInstance.awsID, "", false)
if err != nil {
return "", err
}
@ -4773,7 +4787,7 @@ func setNodeDisk(
}
func getInitialAttachDetachDelay(status string) time.Duration {
if status == "detached" {
if status == volumeDetachedStatus {
return volumeDetachmentStatusInitialDelay
}
return volumeAttachmentStatusInitialDelay

View File

@ -451,6 +451,37 @@ var _ = utils.SIGDescribe("Pod Disks", func() {
ginkgo.By("delete a PD")
framework.ExpectNoError(e2epv.DeletePDWithRetry("non-exist"))
})
// This test is marked to run as serial so as device selection on AWS does not
// conflict with other concurrent attach operations.
ginkgo.It("[Serial] attach on previously attached volumes should work", func() {
e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
ginkgo.By("creating PD")
diskName, err := e2epv.CreatePDWithRetry()
framework.ExpectNoError(err, "Error creating PD")
// this should be safe to do because if attach fails then detach will be considered
// successful and we will delete the volume.
defer func() {
detachAndDeletePDs(diskName, []types.NodeName{host0Name})
}()
ginkgo.By("Attaching volume to a node")
err = attachPD(host0Name, diskName)
framework.ExpectNoError(err, "Error attaching PD")
pod := testPDPod([]string{diskName}, host0Name /*readOnly*/, false, 1)
ginkgo.By("Creating test pod with same volume")
_, err = podClient.Create(context.TODO(), pod, metav1.CreateOptions{})
framework.ExpectNoError(err, "Failed to create pod")
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespaceSlow(f.ClientSet, pod.Name, f.Namespace.Name))
ginkgo.By("deleting the pod")
framework.ExpectNoError(podClient.Delete(context.TODO(), pod.Name, *metav1.NewDeleteOptions(0)), "Failed to delete pod")
framework.Logf("deleted pod %q", pod.Name)
ginkgo.By("waiting for PD to detach")
framework.ExpectNoError(waitForPDDetach(diskName, host0Name))
})
})
func countReadyNodes(c clientset.Interface, hostName types.NodeName) int {
@ -474,6 +505,7 @@ func verifyPDContentsViaContainer(namespace string, f *framework.Framework, podN
}
}
// TODO: move detachPD to standard cloudprovider functions so as these tests can run on other cloudproviders too
func detachPD(nodeName types.NodeName, pdName string) error {
if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
gceCloud, err := gce.GetGCECloud()
@ -512,6 +544,38 @@ func detachPD(nodeName types.NodeName, pdName string) error {
}
}
// TODO: move attachPD to standard cloudprovider functions so as these tests can run on other cloudproviders too
func attachPD(nodeName types.NodeName, pdName string) error {
if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
gceCloud, err := gce.GetGCECloud()
if err != nil {
return err
}
err = gceCloud.AttachDisk(pdName, nodeName, false /*readOnly*/, false /*regional*/)
if err != nil {
framework.Logf("Error attaching PD %q: %v", pdName, err)
}
return err
} else if framework.TestContext.Provider == "aws" {
awsSession, err := session.NewSession()
if err != nil {
return fmt.Errorf("error creating session: %v", err)
}
client := ec2.New(awsSession)
tokens := strings.Split(pdName, "/")
awsVolumeID := tokens[len(tokens)-1]
ebsUtil := utils.NewEBSUtil(client)
err = ebsUtil.AttachDisk(awsVolumeID, string(nodeName))
if err != nil {
return fmt.Errorf("error attaching volume %s to node %s: %v", awsVolumeID, nodeName, err)
}
return nil
} else {
return fmt.Errorf("Provider does not support volume attaching")
}
}
// Returns pod spec suitable for api Create call. Handles gce, gke and aws providers only and
// escapes if a different provider is supplied.
// The first container name is hard-coded to "mycontainer". Subsequent containers are named:

View File

@ -7,6 +7,7 @@ go_library(
srcs = [
"create.go",
"deployment.go",
"ebs.go",
"framework.go",
"host_exec.go",
"local.go",
@ -36,9 +37,12 @@ go_library(
"//test/e2e/framework/ssh:go_default_library",
"//test/e2e/framework/testfiles:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)

View File

@ -0,0 +1,263 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
const (
volumeAttachmentStatusPollDelay = 2 * time.Second
volumeAttachmentStatusFactor = 2
volumeAttachmentStatusSteps = 6
// represents expected attachment status of a volume after attach
volumeAttachedStatus = "attached"
// represents expected attachment status of a volume after detach
volumeDetachedStatus = "detached"
)
// EBSUtil provides functions to interact with EBS volumes
type EBSUtil struct {
client *ec2.EC2
validDevices []string
}
// NewEBSUtil returns an instance of EBSUtil which can be used to
// to interact with EBS volumes
func NewEBSUtil(client *ec2.EC2) *EBSUtil {
ebsUtil := &EBSUtil{client: client}
validDevices := []string{}
for _, firstChar := range []rune{'b', 'c'} {
for i := 'a'; i <= 'z'; i++ {
dev := string([]rune{firstChar, i})
validDevices = append(validDevices, dev)
}
}
ebsUtil.validDevices = validDevices
return ebsUtil
}
// AttachDisk attaches an EBS volume to a node.
func (ebs *EBSUtil) AttachDisk(volumeID string, nodeName string) error {
instance, err := findInstanceByNodeName(nodeName, ebs.client)
if err != nil {
return fmt.Errorf("error finding node %s: %v", nodeName, err)
}
err = ebs.waitForAvailable(volumeID)
if err != nil {
return fmt.Errorf("error waiting volume %s to be available: %v", volumeID, err)
}
device, err := ebs.findFreeDevice(instance)
if err != nil {
return fmt.Errorf("error finding free device on node %s: %v", nodeName, err)
}
hostDevice := "/dev/xvd" + string(device)
attachInput := &ec2.AttachVolumeInput{
VolumeId: &volumeID,
InstanceId: instance.InstanceId,
Device: &hostDevice,
}
_, err = ebs.client.AttachVolume(attachInput)
if err != nil {
return fmt.Errorf("error attaching volume %s to node %s: %v", volumeID, nodeName, err)
}
return ebs.waitForAttach(volumeID)
}
func (ebs *EBSUtil) findFreeDevice(instance *ec2.Instance) (string, error) {
deviceMappings := map[string]string{}
for _, blockDevice := range instance.BlockDeviceMappings {
name := aws.StringValue(blockDevice.DeviceName)
name = strings.TrimPrefix(name, "/dev/sd")
name = strings.TrimPrefix(name, "/dev/xvd")
if len(name) < 1 || len(name) > 2 {
klog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
}
deviceMappings[name] = aws.StringValue(blockDevice.Ebs.VolumeId)
}
for _, device := range ebs.validDevices {
if _, found := deviceMappings[device]; !found {
return device, nil
}
}
return "", fmt.Errorf("no available device")
}
func (ebs *EBSUtil) waitForAttach(volumeID string) error {
backoff := wait.Backoff{
Duration: volumeAttachmentStatusPollDelay,
Factor: volumeAttachmentStatusFactor,
Steps: volumeAttachmentStatusSteps,
}
time.Sleep(volumeAttachmentStatusPollDelay)
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
info, err := ebs.describeVolume(volumeID)
if err != nil {
return false, err
}
if len(info.Attachments) > 1 {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
}
attachmentStatus := ""
for _, a := range info.Attachments {
if attachmentStatus != "" {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
}
if a.State != nil {
attachmentStatus = *a.State
} else {
// Shouldn't happen; log so we know if it is
klog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
}
}
if attachmentStatus == "" {
attachmentStatus = volumeDetachedStatus
}
if attachmentStatus == volumeAttachedStatus {
// Attachment is in requested state, finish waiting
return true, nil
}
return false, nil
})
return err
}
func (ebs *EBSUtil) waitForAvailable(volumeID string) error {
backoff := wait.Backoff{
Duration: volumeAttachmentStatusPollDelay,
Factor: volumeAttachmentStatusFactor,
Steps: volumeAttachmentStatusSteps,
}
time.Sleep(volumeAttachmentStatusPollDelay)
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
info, err := ebs.describeVolume(volumeID)
if err != nil {
return false, err
}
volumeState := aws.StringValue(info.State)
if volumeState != ec2.VolumeStateAvailable {
return false, nil
}
return true, nil
})
return err
}
// Gets the full information about this volume from the EC2 API
func (ebs *EBSUtil) describeVolume(volumeID string) (*ec2.Volume, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
}
results := []*ec2.Volume{}
var nextToken *string
for {
response, err := ebs.client.DescribeVolumes(request)
if err != nil {
return nil, err
}
results = append(results, response.Volumes...)
nextToken = response.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}
if len(results) == 0 {
return nil, fmt.Errorf("no volumes found")
}
if len(results) > 1 {
return nil, fmt.Errorf("multiple volumes found")
}
return results[0], nil
}
func newEc2Filter(name string, value string) *ec2.Filter {
filter := &ec2.Filter{
Name: aws.String(name),
Values: []*string{
aws.String(value),
},
}
return filter
}
func findInstanceByNodeName(nodeName string, cloud *ec2.EC2) (*ec2.Instance, error) {
filters := []*ec2.Filter{
newEc2Filter("private-dns-name", nodeName),
}
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := describeInstances(request, cloud)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, nil
}
if len(instances) > 1 {
return nil, fmt.Errorf("multiple instances found for name: %s", nodeName)
}
return instances[0], nil
}
func describeInstances(request *ec2.DescribeInstancesInput, cloud *ec2.EC2) ([]*ec2.Instance, error) {
// Instances are paged
results := []*ec2.Instance{}
var nextToken *string
for {
response, err := cloud.DescribeInstances(request)
if err != nil {
return nil, fmt.Errorf("error listing AWS instances: %v", err)
}
for _, reservation := range response.Reservations {
results = append(results, reservation.Instances...)
}
nextToken = response.NextToken
if nextToken == nil || len(*nextToken) == 0 {
break
}
request.NextToken = nextToken
}
return results, nil
}