mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 15:37:24 +00:00
Refactor operation_executor to make it unit testable
This commit is contained in:
@@ -21,17 +21,10 @@ limitations under the License.
|
||||
package operationexecutor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
@@ -119,18 +112,12 @@ type OperationExecutor interface {
|
||||
|
||||
// NewOperationExecutor returns a new instance of OperationExecutor.
|
||||
func NewOperationExecutor(
|
||||
kubeClient clientset.Interface,
|
||||
volumePluginMgr *volume.VolumePluginMgr,
|
||||
recorder record.EventRecorder,
|
||||
checkNodeCapabilitiesBeforeMount bool) OperationExecutor {
|
||||
operationGenerator OperationGenerator) OperationExecutor {
|
||||
|
||||
return &operationExecutor{
|
||||
kubeClient: kubeClient,
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
|
||||
true /* exponentialBackOffOnError */),
|
||||
recorder: recorder,
|
||||
checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
|
||||
operationGenerator: operationGenerator,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,25 +348,13 @@ type MountedVolume struct {
|
||||
}
|
||||
|
||||
type operationExecutor struct {
|
||||
// Used to fetch objects from the API server like Node in the
|
||||
// VerifyControllerAttachedVolume operation.
|
||||
kubeClient clientset.Interface
|
||||
|
||||
// volumePluginMgr is the volume plugin manager used to create volume
|
||||
// plugin objects.
|
||||
volumePluginMgr *volume.VolumePluginMgr
|
||||
|
||||
// pendingOperations keeps track of pending attach and detach operations so
|
||||
// multiple operations are not started on the same volume
|
||||
pendingOperations nestedpendingoperations.NestedPendingOperations
|
||||
|
||||
// recorder is used to record events in the API server
|
||||
recorder record.EventRecorder
|
||||
|
||||
// checkNodeCapabilitiesBeforeMount, if set, enables the CanMount check,
|
||||
// which verifies that the components (binaries, etc.) required to mount
|
||||
// the volume are available on the underlying node before attempting mount.
|
||||
checkNodeCapabilitiesBeforeMount bool
|
||||
// operationGenerator is an interface that provides implementations for
|
||||
// generating volume function
|
||||
operationGenerator OperationGenerator
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
|
||||
@@ -390,7 +365,7 @@ func (oe *operationExecutor) AttachVolume(
|
||||
volumeToAttach VolumeToAttach,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||
attachFunc, err :=
|
||||
oe.generateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -404,7 +379,7 @@ func (oe *operationExecutor) DetachVolume(
|
||||
verifySafeToDetach bool,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||
detachFunc, err :=
|
||||
oe.generateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -418,7 +393,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
|
||||
nodeName types.NodeName,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||
volumesAreAttachedFunc, err :=
|
||||
oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -430,7 +405,7 @@ func (oe *operationExecutor) MountVolume(
|
||||
waitForAttachTimeout time.Duration,
|
||||
volumeToMount VolumeToMount,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
mountFunc, err := oe.generateMountVolumeFunc(
|
||||
mountFunc, err := oe.operationGenerator.GenerateMountVolumeFunc(
|
||||
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -453,7 +428,7 @@ func (oe *operationExecutor) UnmountVolume(
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
|
||||
|
||||
unmountFunc, err :=
|
||||
oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -471,7 +446,7 @@ func (oe *operationExecutor) UnmountDevice(
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater,
|
||||
mounter mount.Interface) error {
|
||||
unmountDeviceFunc, err :=
|
||||
oe.generateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
|
||||
oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -485,7 +460,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||
nodeName types.NodeName,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
|
||||
verifyControllerAttachedVolumeFunc, err :=
|
||||
oe.generateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
|
||||
oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -494,637 +469,8 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateVolumesAreAttachedFunc(
|
||||
attachedVolumes []AttachedVolume,
|
||||
nodeName types.NodeName,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
|
||||
// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
|
||||
// to this type of plugin
|
||||
volumesPerPlugin := make(map[string][]*volume.Spec)
|
||||
// volumeSpecMap maps from a volume spec to its unique volumeName which will be used
|
||||
// when calling MarkVolumeAsDetached
|
||||
volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName)
|
||||
// Iterate each volume spec and put them into a map index by the pluginName
|
||||
for _, volumeAttached := range attachedVolumes {
|
||||
volumePlugin, err :=
|
||||
oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
|
||||
if err != nil || volumePlugin == nil {
|
||||
glog.Errorf(
|
||||
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
|
||||
volumeAttached.VolumeName,
|
||||
volumeAttached.VolumeSpec.Name(),
|
||||
volumeAttached.NodeName,
|
||||
err)
|
||||
}
|
||||
volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
|
||||
if !pluginExists {
|
||||
volumeSpecList = []*volume.Spec{}
|
||||
}
|
||||
volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
|
||||
volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
|
||||
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
|
||||
}
|
||||
|
||||
return func() error {
|
||||
|
||||
// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
|
||||
// whether the volumes are still attached.
|
||||
for pluginName, volumesSpecs := range volumesPerPlugin {
|
||||
attachableVolumePlugin, err :=
|
||||
oe.volumePluginMgr.FindAttachablePluginByName(pluginName)
|
||||
if err != nil || attachableVolumePlugin == nil {
|
||||
glog.Errorf(
|
||||
"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
|
||||
pluginName,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
|
||||
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
|
||||
if newAttacherErr != nil {
|
||||
glog.Errorf(
|
||||
"VolumesAreAttached failed for getting plugin %q with: %v",
|
||||
pluginName,
|
||||
newAttacherErr)
|
||||
continue
|
||||
}
|
||||
|
||||
attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
|
||||
if areAttachedErr != nil {
|
||||
glog.Errorf(
|
||||
"VolumesAreAttached failed for checking on node %q with: %v",
|
||||
nodeName,
|
||||
areAttachedErr)
|
||||
continue
|
||||
}
|
||||
|
||||
for spec, check := range attached {
|
||||
if !check {
|
||||
actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
|
||||
glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
|
||||
volumeSpecMap[spec], spec.Name(), nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateAttachVolumeFunc(
|
||||
volumeToAttach VolumeToAttach,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
// Get attacher plugin
|
||||
attachableVolumePlugin, err :=
|
||||
oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
|
||||
if err != nil || attachableVolumePlugin == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"AttachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToAttach.VolumeName,
|
||||
volumeToAttach.VolumeSpec.Name(),
|
||||
volumeToAttach.NodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
|
||||
if newAttacherErr != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"AttachVolume.NewAttacher failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToAttach.VolumeName,
|
||||
volumeToAttach.VolumeSpec.Name(),
|
||||
volumeToAttach.NodeName,
|
||||
newAttacherErr)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
// Execute attach
|
||||
devicePath, attachErr := volumeAttacher.Attach(
|
||||
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
|
||||
|
||||
if attachErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
err := fmt.Errorf(
|
||||
"Failed to attach volume %q on node %q with: %v",
|
||||
volumeToAttach.VolumeSpec.Name(),
|
||||
volumeToAttach.NodeName,
|
||||
attachErr)
|
||||
for _, pod := range volumeToAttach.ScheduledPods {
|
||||
oe.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"AttachVolume.Attach succeeded for volume %q (spec.Name: %q) from node %q.",
|
||||
volumeToAttach.VolumeName,
|
||||
volumeToAttach.VolumeSpec.Name(),
|
||||
volumeToAttach.NodeName)
|
||||
|
||||
// Update actual state of world
|
||||
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
|
||||
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
|
||||
if addVolumeNodeErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"AttachVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToAttach.VolumeName,
|
||||
volumeToAttach.VolumeSpec.Name(),
|
||||
volumeToAttach.NodeName,
|
||||
addVolumeNodeErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateDetachVolumeFunc(
|
||||
volumeToDetach AttachedVolume,
|
||||
verifySafeToDetach bool,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
// Get attacher plugin
|
||||
attachableVolumePlugin, err :=
|
||||
oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
|
||||
if err != nil || attachableVolumePlugin == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"DetachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
volumeName, err :=
|
||||
attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"DetachVolume.GetVolumeName failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
volumeDetacher, err := attachableVolumePlugin.NewDetacher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"DetachVolume.NewDetacher failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
var err error
|
||||
if verifySafeToDetach {
|
||||
err = oe.verifyVolumeIsSafeToDetach(volumeToDetach)
|
||||
}
|
||||
if err == nil {
|
||||
err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
|
||||
}
|
||||
if err != nil {
|
||||
// On failure, add volume back to ReportAsAttached list
|
||||
actualStateOfWorld.AddVolumeToReportAsAttached(
|
||||
volumeToDetach.VolumeName, volumeToDetach.NodeName)
|
||||
return fmt.Errorf(
|
||||
"DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName,
|
||||
err)
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"DetachVolume.Detach succeeded for volume %q (spec.Name: %q) from node %q.",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName)
|
||||
|
||||
// Update actual state of world
|
||||
actualStateOfWorld.MarkVolumeAsDetached(
|
||||
volumeToDetach.VolumeName, volumeToDetach.NodeName)
|
||||
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) verifyVolumeIsSafeToDetach(
|
||||
volumeToDetach AttachedVolume) error {
|
||||
// Fetch current node object
|
||||
node, fetchErr := oe.kubeClient.Core().Nodes().Get(string(volumeToDetach.NodeName), metav1.GetOptions{})
|
||||
if fetchErr != nil {
|
||||
if errors.IsNotFound(fetchErr) {
|
||||
glog.Warningf("Node %q not found on API server. DetachVolume will skip safe to detach check.",
|
||||
volumeToDetach.NodeName,
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name())
|
||||
return nil
|
||||
}
|
||||
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName,
|
||||
fetchErr)
|
||||
}
|
||||
|
||||
if node == nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName)
|
||||
}
|
||||
|
||||
for _, inUseVolume := range node.Status.VolumesInUse {
|
||||
if inUseVolume == volumeToDetach.VolumeName {
|
||||
return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName)
|
||||
}
|
||||
}
|
||||
|
||||
// Volume is not marked as in use by node
|
||||
glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.",
|
||||
volumeToDetach.VolumeName,
|
||||
volumeToDetach.VolumeSpec.Name(),
|
||||
volumeToDetach.NodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateMountVolumeFunc(
|
||||
waitForAttachTimeout time.Duration,
|
||||
volumeToMount VolumeToMount,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
// Get mounter plugin
|
||||
volumePlugin, err :=
|
||||
oe.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
|
||||
if err != nil || volumePlugin == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"MountVolume.FindPluginBySpec failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
err)
|
||||
}
|
||||
|
||||
volumeMounter, newMounterErr := volumePlugin.NewMounter(
|
||||
volumeToMount.VolumeSpec,
|
||||
volumeToMount.Pod,
|
||||
volume.VolumeOptions{})
|
||||
if newMounterErr != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"MountVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
newMounterErr)
|
||||
}
|
||||
|
||||
// Get attacher, if possible
|
||||
attachableVolumePlugin, _ :=
|
||||
oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
|
||||
var volumeAttacher volume.Attacher
|
||||
if attachableVolumePlugin != nil {
|
||||
volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
|
||||
}
|
||||
|
||||
var fsGroup *int64
|
||||
if volumeToMount.Pod.Spec.SecurityContext != nil &&
|
||||
volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil {
|
||||
fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup
|
||||
}
|
||||
|
||||
return func() error {
|
||||
if volumeAttacher != nil {
|
||||
// Wait for attachable volumes to finish attaching
|
||||
glog.Infof(
|
||||
"Entering MountVolume.WaitForAttach for volume %q (spec.Name: %q) pod %q (UID: %q) DevicePath: %q",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
volumeToMount.DevicePath)
|
||||
|
||||
devicePath, err := volumeAttacher.WaitForAttach(
|
||||
volumeToMount.VolumeSpec, volumeToMount.DevicePath, waitForAttachTimeout)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"MountVolume.WaitForAttach failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
err)
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"MountVolume.WaitForAttach succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID)
|
||||
|
||||
deviceMountPath, err :=
|
||||
volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"MountVolume.GetDeviceMountPath failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
err)
|
||||
}
|
||||
|
||||
// Mount device to global mount path
|
||||
err = volumeAttacher.MountDevice(
|
||||
volumeToMount.VolumeSpec,
|
||||
devicePath,
|
||||
deviceMountPath)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
err := fmt.Errorf(
|
||||
"MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
err)
|
||||
oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q) device mount path %q",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
deviceMountPath)
|
||||
|
||||
// Update actual state of world to reflect volume is globally mounted
|
||||
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
|
||||
volumeToMount.VolumeName)
|
||||
if markDeviceMountedErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"MountVolume.MarkDeviceAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
markDeviceMountedErr)
|
||||
}
|
||||
}
|
||||
|
||||
if oe.checkNodeCapabilitiesBeforeMount {
|
||||
if canMountErr := volumeMounter.CanMount(); canMountErr != nil {
|
||||
errMsg := fmt.Sprintf("Unable to mount volume %v (spec.Name: %v) on pod %v (UID: %v). Verify that your node machine has the required components before attempting to mount this volume type. %s", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.Pod.Name, volumeToMount.Pod.UID, canMountErr.Error())
|
||||
oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, errMsg)
|
||||
glog.Errorf(errMsg)
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
// Execute mount
|
||||
mountErr := volumeMounter.SetUp(fsGroup)
|
||||
if mountErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
err := fmt.Errorf(
|
||||
"MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
mountErr)
|
||||
oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"MountVolume.SetUp succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID)
|
||||
|
||||
// Update actual state of world
|
||||
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
volumeToMount.VolumeName,
|
||||
volumeMounter,
|
||||
volumeToMount.OuterVolumeSpecName,
|
||||
volumeToMount.VolumeGidValue)
|
||||
if markVolMountedErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"MountVolume.MarkVolumeAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
markVolMountedErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateUnmountVolumeFunc(
|
||||
volumeToUnmount MountedVolume,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) {
|
||||
// Get mountable plugin
|
||||
volumePlugin, err :=
|
||||
oe.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
|
||||
if err != nil || volumePlugin == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"UnmountVolume.FindPluginByName failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v",
|
||||
volumeToUnmount.VolumeName,
|
||||
volumeToUnmount.OuterVolumeSpecName,
|
||||
volumeToUnmount.PodName,
|
||||
volumeToUnmount.PodUID,
|
||||
err)
|
||||
}
|
||||
|
||||
volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
|
||||
volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
|
||||
if newUnmounterErr != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"UnmountVolume.NewUnmounter failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v",
|
||||
volumeToUnmount.VolumeName,
|
||||
volumeToUnmount.OuterVolumeSpecName,
|
||||
volumeToUnmount.PodName,
|
||||
volumeToUnmount.PodUID,
|
||||
newUnmounterErr)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
// Execute unmount
|
||||
unmountErr := volumeUnmounter.TearDown()
|
||||
if unmountErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"UnmountVolume.TearDown failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToUnmount.VolumeName,
|
||||
volumeToUnmount.OuterVolumeSpecName,
|
||||
volumeToUnmount.PodName,
|
||||
volumeToUnmount.PodUID,
|
||||
unmountErr)
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
|
||||
volumeToUnmount.VolumeName,
|
||||
volumeToUnmount.OuterVolumeSpecName,
|
||||
volumeToUnmount.PodName,
|
||||
volumeToUnmount.PodUID,
|
||||
volumeToUnmount.InnerVolumeSpecName,
|
||||
volumeToUnmount.PluginName,
|
||||
volumeToUnmount.VolumeGidValue)
|
||||
|
||||
// Update actual state of world
|
||||
markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
|
||||
volumeToUnmount.PodName, volumeToUnmount.VolumeName)
|
||||
if markVolMountedErr != nil {
|
||||
// On failure, just log and exit
|
||||
glog.Errorf(
|
||||
"UnmountVolume.MarkVolumeAsUnmounted failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToUnmount.VolumeName,
|
||||
volumeToUnmount.OuterVolumeSpecName,
|
||||
volumeToUnmount.PodName,
|
||||
volumeToUnmount.PodUID,
|
||||
markVolMountedErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateUnmountDeviceFunc(
|
||||
deviceToDetach AttachedVolume,
|
||||
actualStateOfWorld ActualStateOfWorldMounterUpdater,
|
||||
mounter mount.Interface) (func() error, error) {
|
||||
// Get attacher plugin
|
||||
attachableVolumePlugin, err :=
|
||||
oe.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec)
|
||||
if err != nil || attachableVolumePlugin == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"UnmountDevice.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
err)
|
||||
}
|
||||
|
||||
volumeDetacher, err := attachableVolumePlugin.NewDetacher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"UnmountDevice.NewDetacher failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
err)
|
||||
}
|
||||
|
||||
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"UnmountDevice.NewAttacher failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
err)
|
||||
}
|
||||
|
||||
return func() error {
|
||||
deviceMountPath, err :=
|
||||
volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec)
|
||||
if err != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"GetDeviceMountPath failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
err)
|
||||
}
|
||||
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
|
||||
|
||||
if err != nil || hasMountRefs(deviceMountPath, refs) {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
|
||||
}
|
||||
return fmt.Errorf(
|
||||
"GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
err)
|
||||
}
|
||||
// Execute unmount
|
||||
unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath)
|
||||
if unmountDeviceErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"UnmountDevice failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
unmountDeviceErr)
|
||||
}
|
||||
// Before logging that UnmountDevice succeeded and moving on,
|
||||
// use mounter.PathIsDevice to check if the path is a device,
|
||||
// if so use mounter.DeviceOpened to check if the device is in use anywhere
|
||||
// else on the system. Retry if it returns true.
|
||||
isDevicePath, devicePathErr := mounter.PathIsDevice(deviceToDetach.DevicePath)
|
||||
var deviceOpened bool
|
||||
var deviceOpenedErr error
|
||||
if !isDevicePath && devicePathErr == nil {
|
||||
// not a device path or path doesn't exist
|
||||
//TODO: refer to #36092
|
||||
glog.V(3).Infof("Not checking device path %s", deviceToDetach.DevicePath)
|
||||
deviceOpened = false
|
||||
} else {
|
||||
deviceOpened, deviceOpenedErr = mounter.DeviceOpened(deviceToDetach.DevicePath)
|
||||
if deviceOpenedErr != nil {
|
||||
return fmt.Errorf(
|
||||
"UnmountDevice.DeviceOpened failed for volume %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
deviceOpenedErr)
|
||||
}
|
||||
}
|
||||
// The device is still in use elsewhere. Caller will log and retry.
|
||||
if deviceOpened {
|
||||
return fmt.Errorf(
|
||||
"UnmountDevice failed for volume %q (spec.Name: %q) because the device is in use when it was no longer expected to be in use",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name())
|
||||
}
|
||||
|
||||
glog.Infof(
|
||||
"UnmountDevice succeeded for volume %q (spec.Name: %q).",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name())
|
||||
|
||||
// Update actual state of world
|
||||
markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
|
||||
deviceToDetach.VolumeName)
|
||||
if markDeviceUnmountedErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"MarkDeviceAsUnmounted failed for device %q (spec.Name: %q) with: %v",
|
||||
deviceToDetach.VolumeName,
|
||||
deviceToDetach.VolumeSpec.Name(),
|
||||
markDeviceUnmountedErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil
|
||||
func (oe *operationExecutor) GetNestedPendingOperations() nestedpendingoperations.NestedPendingOperations {
|
||||
return oe.pendingOperations
|
||||
}
|
||||
|
||||
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
|
||||
@@ -1144,100 +490,3 @@ func hasMountRefs(mountPath string, mountRefs []string) bool {
|
||||
}
|
||||
return count > 0
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
|
||||
volumeToMount VolumeToMount,
|
||||
nodeName types.NodeName,
|
||||
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
|
||||
return func() error {
|
||||
if !volumeToMount.PluginIsAttachable {
|
||||
// If the volume does not implement the attacher interface, it is
|
||||
// assumed to be attached and the actual state of the world is
|
||||
// updated accordingly.
|
||||
|
||||
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
|
||||
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
|
||||
if addVolumeNodeErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
addVolumeNodeErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if !volumeToMount.ReportedInUse {
|
||||
// If the given volume has not yet been added to the list of
|
||||
// VolumesInUse in the node's volume status, do not proceed, return
|
||||
// error. Caller will log and retry. The node status is updated
|
||||
// periodically by kubelet, so it may take as much as 10 seconds
|
||||
// before this clears.
|
||||
// Issue #28141 to enable on demand status updates.
|
||||
return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID)
|
||||
}
|
||||
|
||||
// Fetch current node object
|
||||
node, fetchErr := oe.kubeClient.Core().Nodes().Get(string(nodeName), metav1.GetOptions{})
|
||||
if fetchErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"VerifyControllerAttachedVolume failed fetching node from API server. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
fetchErr)
|
||||
}
|
||||
|
||||
if node == nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"VerifyControllerAttachedVolume failed. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: node object retrieved from API server is nil",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID)
|
||||
}
|
||||
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
if attachedVolume.Name == volumeToMount.VolumeName {
|
||||
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
|
||||
v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
|
||||
glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q) devicePath: %q",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
attachedVolume.DevicePath)
|
||||
|
||||
if addVolumeNodeErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
return fmt.Errorf(
|
||||
"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID,
|
||||
addVolumeNodeErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Volume not attached, return error. Caller will log and retry.
|
||||
return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) is not yet attached according to node status",
|
||||
volumeToMount.VolumeName,
|
||||
volumeToMount.VolumeSpec.Name(),
|
||||
volumeToMount.PodName,
|
||||
volumeToMount.Pod.UID)
|
||||
}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user