Controller wait for attach and exponential backoff

Modify attach/detach controller to keep track of volumes to report
attached in Node VolumeToAttach status.

Modify kubelet volume manager to wait for volume to show up in Node
VolumeToAttach status.

Implement exponential backoff for errors in volume manager and attach
detach controller
This commit is contained in:
saadali
2016-06-15 23:48:04 -07:00
parent d72f88bf3a
commit e716ddc771
24 changed files with 1261 additions and 361 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/volume"
@@ -49,6 +50,9 @@ import (
// Once the operation is started, since it is executed asynchronously,
// errors are simply logged and the goroutine is terminated without updating
// actualStateOfWorld (callers are responsible for retrying as needed).
//
// Some of these operations may result in calls to the API server; callers are
// responsible for rate limiting on errors.
type OperationExecutor interface {
// AttachVolume attaches the volume to the node specified in volumeToAttach.
// It then updates the actual state of the world to reflect that.
@@ -78,14 +82,29 @@ type OperationExecutor interface {
// attachable volumes only, freeing it for detach. It then updates the
// actual state of the world to reflect that.
UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
// VerifyControllerAttachedVolume checks if the specified volume is present
// in the specified nodes AttachedVolumes Status field. It uses kubeClient
// to fetch the node object.
// If the volume is found, the actual state of the world is updated to mark
// the volume as attached.
// If the volume does not implement the attacher interface, it is assumed to
// be attached and the the actual state of the world is updated accordingly.
// If the volume is not found or there is an error (fetching the node
// object, for example) then an error is returned which triggers exponential
// back off on retries.
VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName string, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
}
// NewOperationExecutor returns a new instance of OperationExecutor.
func NewOperationExecutor(
kubeClient internalclientset.Interface,
volumePluginMgr *volume.VolumePluginMgr) OperationExecutor {
return &operationExecutor{
volumePluginMgr: volumePluginMgr,
pendingOperations: goroutinemap.NewGoRoutineMap(),
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
pendingOperations: goroutinemap.NewGoRoutineMap(
true /* exponentialBackOffOnError */),
}
}
@@ -109,7 +128,7 @@ type ActualStateOfWorldMounterUpdater interface {
// actual state of the world cache after successful attach/detach/mount/unmount.
type ActualStateOfWorldAttacherUpdater interface {
// Marks the specified volume as attached to the specified node
MarkVolumeAsAttached(volumeSpec *volume.Spec, nodeName string) error
MarkVolumeAsAttached(volumeSpec *volume.Spec, nodeName string, devicePath string) error
// Marks the specified volume as detached from the specified node
MarkVolumeAsDetached(volumeName api.UniqueVolumeName, nodeName string)
@@ -160,6 +179,10 @@ type VolumeToMount struct {
// VolumeGidValue contains the value of the GID annotation, if present.
VolumeGidValue string
// DevicePath contains the path on the node where the volume is attached.
// For non-attachable volumes this is empty.
DevicePath string
}
// AttachedVolume represents a volume that is attached to a node.
@@ -284,9 +307,14 @@ type MountedVolume struct {
}
type operationExecutor struct {
// Used to fetch objects from the API server like Node in the
// VerifyControllerAttachedVolume operation.
kubeClient internalclientset.Interface
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
// pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume
pendingOperations goroutinemap.GoRoutineMap
@@ -358,6 +386,20 @@ func (oe *operationExecutor) UnmountDevice(
string(deviceToDetach.VolumeName), unmountDeviceFunc)
}
func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount VolumeToMount,
nodeName string,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
verifyControllerAttachedVolumeFunc, err :=
oe.generateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
if err != nil {
return err
}
return oe.pendingOperations.Run(
string(volumeToMount.VolumeName), verifyControllerAttachedVolumeFunc)
}
func (oe *operationExecutor) generateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
@@ -385,18 +427,17 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
return func() error {
// Execute attach
_, attachErr := volumeAttacher.Attach(
devicePath, attachErr := volumeAttacher.Attach(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if attachErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"AttachVolume.Attach failed for volume %q (spec.Name: %q) from node %q with: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
attachErr)
return attachErr
}
glog.Infof(
@@ -407,16 +448,15 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
if addVolumeNodeErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"AttachVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) from node %q with: %v.",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
addVolumeNodeErr)
return addVolumeNodeErr
}
return nil
@@ -463,14 +503,13 @@ func (oe *operationExecutor) generateDetachVolumeFunc(
// Execute detach
detachErr := volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
if detachErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v",
volumeToDetach.VolumeName,
volumeToDetach.VolumeSpec.Name(),
volumeToDetach.NodeName,
detachErr)
return detachErr
}
glog.Infof(
@@ -543,16 +582,16 @@ func (oe *operationExecutor) generateMountVolumeFunc(
volumeToMount.Pod.UID)
devicePath, err := volumeAttacher.WaitForAttach(
volumeToMount.VolumeSpec, waitForAttachTimeout)
volumeToMount.VolumeSpec, volumeToMount.DevicePath, waitForAttachTimeout)
if err != nil {
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.WaitForAttach failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
err)
return err
}
glog.Infof(
@@ -565,14 +604,14 @@ func (oe *operationExecutor) generateMountVolumeFunc(
deviceMountPath, err :=
volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.GetDeviceMountPath failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
err)
return err
}
// Mount device to global mount path
@@ -581,14 +620,14 @@ func (oe *operationExecutor) generateMountVolumeFunc(
devicePath,
deviceMountPath)
if err != nil {
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
err)
return err
}
glog.Infof(
@@ -602,30 +641,28 @@ func (oe *operationExecutor) generateMountVolumeFunc(
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
volumeToMount.VolumeName)
if markDeviceMountedErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.MarkDeviceAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
markDeviceMountedErr)
return markDeviceMountedErr
}
}
// Execute mount
mountErr := volumeMounter.SetUp(fsGroup)
if mountErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
mountErr)
return mountErr
}
glog.Infof(
@@ -644,15 +681,14 @@ func (oe *operationExecutor) generateMountVolumeFunc(
volumeToMount.OuterVolumeSpecName,
volumeToMount.VolumeGidValue)
if markVolMountedErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MountVolume.MarkVolumeAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
markVolMountedErr)
return markVolMountedErr
}
return nil
@@ -691,15 +727,14 @@ func (oe *operationExecutor) generateUnmountVolumeFunc(
// Execute unmount
unmountErr := volumeUnmounter.TearDown()
if unmountErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"UnmountVolume.TearDown failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v",
volumeToUnmount.VolumeName,
volumeToUnmount.OuterVolumeSpecName,
volumeToUnmount.PodName,
volumeToUnmount.PodUID,
unmountErr)
return unmountErr
}
glog.Infof(
@@ -763,25 +798,23 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
deviceMountPath, err :=
volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec)
if err != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"GetDeviceMountPath failed for volume %q (spec.Name: %q) with: %v",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name(),
err)
return err
}
// Execute unmount
unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath)
if unmountDeviceErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"UnmountDevice failed for volume %q (spec.Name: %q) with: %v",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name(),
unmountDeviceErr)
return unmountDeviceErr
}
glog.Infof(
@@ -793,15 +826,95 @@ func (oe *operationExecutor) generateUnmountDeviceFunc(
markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
deviceToDetach.VolumeName)
if markDeviceUnmountedErr != nil {
// On failure, just log and exit. The controller will retry
glog.Errorf(
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"MarkDeviceAsUnmounted failed for device %q (spec.Name: %q) with: %v",
deviceToDetach.VolumeName,
deviceToDetach.VolumeSpec.Name(),
markDeviceUnmountedErr)
return markDeviceUnmountedErr
}
return nil
}, nil
}
func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
volumeToMount VolumeToMount,
nodeName string,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
if !volumeToMount.PluginIsAttachable {
// If the volume does not implement the attacher interface, it is
// assumed to be attached and the the actual state of the world is
// updated accordingly.
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, volumeToMount.DevicePath)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
addVolumeNodeErr)
}
return nil
}
// Fetch current node object
node, fetchErr := oe.kubeClient.Core().Nodes().Get(nodeName)
if fetchErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume failed fetching node from API server. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: %v.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
fetchErr)
}
if node == nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume failed. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: node object retrieved from API server is nil.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}
for _, attachedVolume := range node.Status.VolumesAttached {
if attachedVolume.Name == volumeToMount.VolumeName {
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, volumeToMount.DevicePath)
glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q)",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID,
addVolumeNodeErr)
}
return nil
}
}
// Volume not attached, return error. Caller will log and retry.
return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) is not yet attached according to node status.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
volumeToMount.Pod.UID)
}, nil
}