Add migrated field to storage_operation_duration_seconds metric

This commit is contained in:
Jiawei Wang 2021-02-12 17:35:01 -08:00
parent ef319e24bc
commit 6a7222cf4e
12 changed files with 307 additions and 168 deletions

View File

@ -23,7 +23,7 @@ import (
"regexp"
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -121,9 +121,9 @@ func TestSyncHandler(t *testing.T) {
var expController *expandController
expController, _ = expc.(*expandController)
var expansionCalled bool
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) {
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() volumetypes.OperationContext {
expansionCalled = true
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
})
if test.pv != nil {

View File

@ -51,6 +51,7 @@ import (
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/klog/v2"
)
@ -1412,7 +1413,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu
opComplete := util.OperationCompleteHook(pluginName, "volume_delete")
err = deleter.Delete()
opComplete(&err)
opComplete(volumetypes.CompleteFuncParam{Err: &err})
if err != nil {
// Deleter failed
return pluginName, false, err
@ -1558,7 +1559,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision(selectedNode, allowedTopologies)
opComplete(&err)
opComplete(volumetypes.CompleteFuncParam{Err: &err})
if err != nil {
// Other places of failure have nothing to do with VolumeScheduling,
// so just let controller retry in the next sync. We'll only call func

View File

@ -142,6 +142,7 @@ func TranslateInTreeSpecToCSI(spec *volume.Spec, translator InTreeToCSITranslato
return nil, fmt.Errorf("failed to translate in-tree pv to CSI: %v", err)
}
return &volume.Spec{
Migrated: true,
PersistentVolume: csiPV,
ReadOnly: spec.ReadOnly,
InlineVolumeSpecForCSIMigration: inlineVolume,

View File

@ -473,6 +473,7 @@ type Spec struct {
PersistentVolume *v1.PersistentVolume
ReadOnly bool
InlineVolumeSpecForCSIMigration bool
Migrated bool
}
// Name returns the name of either Volume or PersistentVolume, one of which must not be nil.

View File

@ -18,6 +18,7 @@ package util
import (
"fmt"
"strconv"
"time"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -25,6 +26,7 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
@ -47,7 +49,7 @@ var storageOperationMetric = metrics.NewHistogramVec(
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
StabilityLevel: metrics.ALPHA,
},
[]string{"volume_plugin", "operation_name", "status"},
[]string{"volume_plugin", "operation_name", "status", "migrated"},
)
var storageOperationStatusMetric = metrics.NewCounterVec(
@ -82,25 +84,29 @@ func registerMetrics() {
}
// OperationCompleteHook returns a hook to call when an operation is completed
func OperationCompleteHook(plugin, operationName string) func(*error) {
func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) {
requestTime := time.Now()
opComplete := func(err *error) {
opComplete := func(c types.CompleteFuncParam) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
status := statusSuccess
if *err != nil {
if *c.Err != nil {
// TODO: Establish well-known error codes to be able to distinguish
// user configuration errors from system errors.
status = statusFailUnknown
}
storageOperationMetric.WithLabelValues(plugin, operationName, status).Observe(timeTaken)
migrated := false
if c.Migrated != nil {
migrated = *c.Migrated
}
storageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken)
storageOperationStatusMetric.WithLabelValues(plugin, operationName, status).Inc()
}
return opComplete
}
// FSGroupCompleteHook returns a hook to call when volume recursive permission is changed
func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(*error) {
func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(types.CompleteFuncParam) {
return OperationCompleteHook(GetFullQualifiedPluginNameForVolume(plugin.GetPluginName(), spec), "volume_fsgroup_recursive_apply")
}

View File

@ -21,7 +21,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
@ -824,36 +824,38 @@ func testConcurrentOperationsNegative(
/* END concurrent operations tests */
func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
return func() (error, error) {
func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
done <- true
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
}
func generateWaitFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) {
func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
}
func panicFunc() (error, error) {
func panicFunc() volumetypes.OperationContext {
panic("testing panic")
}
func errorFunc() (error, error) {
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
func errorFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}
func generateWaitWithErrorFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) {
func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}
}
func noopFunc() (error, error) { return nil, nil }
func noopFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(nil, nil, false)
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{

View File

@ -32,13 +32,13 @@ import (
type fakeOGCounter struct {
// calledFuncs stores name and count of functions
calledFuncs map[string]int
opFunc func() (error, error)
opFunc func() volumetypes.OperationContext
}
var _ OperationGenerator = &fakeOGCounter{}
// NewFakeOGCounter returns a OperationGenerator
func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator {
func NewFakeOGCounter(opFunc func() volumetypes.OperationContext) OperationGenerator {
return &fakeOGCounter{
calledFuncs: map[string]int{},
opFunc: opFunc,

View File

@ -585,63 +585,63 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
}
func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}
}
func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}
}
func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -649,9 +649,9 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v
}
func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -659,9 +659,9 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentV
}
func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -673,9 +673,9 @@ func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
pluginNane string,
volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -683,9 +683,9 @@ func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
}
func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -693,9 +693,9 @@ func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout t
}
func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
@ -703,9 +703,9 @@ func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount Moun
}
func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
opFunc := func() (error, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,

View File

@ -185,7 +185,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
}
volumesAreAttachedFunc := func() (error, error) {
volumesAreAttachedFunc := func() volumetypes.OperationContext {
// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
// whether the volumes are still attached.
@ -227,7 +227,8 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
}
}
return nil, nil
// It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached_per_node
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
@ -248,7 +249,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
// function except volumeSpecMap which contains original volume names for
// use with actualStateOfWorld
bulkVolumeVerifyFunc := func() (error, error) {
bulkVolumeVerifyFunc := func() volumetypes.OperationContext {
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
@ -256,7 +257,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
"BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v",
pluginName,
err)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
@ -266,19 +267,19 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
"BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v",
attachableVolumePlugin,
newAttacherErr)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier)
if !ok {
klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes)
if bulkAttachErr != nil {
klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
for nodeName, volumeSpecs := range pluginNodeVolumes {
@ -303,7 +304,8 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
}
}
return nil, nil
// It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
@ -319,16 +321,21 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
attachVolumeFunc := func() (error, error) {
attachVolumeFunc := func() volumetypes.OperationContext {
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
migrated := getMigratedStatusBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err)
eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
if newAttacherErr != nil {
return volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr)
eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Execute attach
@ -349,7 +356,8 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
// On failure, return error. Caller will log and retry.
return volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr)
eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Successful attach event is useful for user debugging
@ -364,10 +372,11 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
eventRecorderFunc := func(err *error) {
@ -452,7 +461,7 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
}
getVolumePluginMgrFunc := func() (error, error) {
detachVolumeFunc := func() volumetypes.OperationContext {
var err error
if verifySafeToDetach {
err = og.verifyVolumeIsSafeToDetach(volumeToDetach)
@ -460,11 +469,15 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if err == nil {
err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
}
migrated := getMigratedStatusBySpec(volumeToDetach.VolumeSpec)
if err != nil {
// On failure, add volume back to ReportAsAttached list
actualStateOfWorld.AddVolumeToReportAsAttached(
volumeToDetach.VolumeName, volumeToDetach.NodeName)
return volumeToDetach.GenerateError("DetachVolume.Detach failed", err)
eventErr, detailedErr := volumeToDetach.GenerateError("DetachVolume.Detach failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", ""))
@ -473,12 +486,12 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
actualStateOfWorld.MarkVolumeAsDetached(
volumeToDetach.VolumeName, volumeToDetach.NodeName)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
OperationName: "volume_detach",
OperationFunc: getVolumePluginMgrFunc,
OperationFunc: detachVolumeFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"),
EventRecorderFunc: nil, // nil because we do not want to generate event on error
}, nil
@ -497,16 +510,21 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
volumePluginName = volumePlugin.GetPluginName()
}
mountVolumeFunc := func() (error, error) {
mountVolumeFunc := func() volumetypes.OperationContext {
// Get mounter plugin
volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
affinityErr := checkNodeAffinity(og, volumeToMount)
if affinityErr != nil {
return volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
volumeMounter, newMounterErr := volumePlugin.NewMounter(
@ -514,14 +532,15 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
return volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
if mountCheckError != nil {
return volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Get attacher, if possible
@ -559,7 +578,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)))
@ -576,7 +596,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Mount device to global mount path
@ -588,7 +609,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
og.checkForFailedMount(volumeToMount, err)
og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld)
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath)))
@ -598,7 +620,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
volumeToMount.VolumeName, devicePath, deviceMountPath)
if markDeviceMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// If volume expansion is performed after MountDevice but before SetUp then
@ -626,7 +649,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
"MountVolume.MountDevice failed to mark volume as uncertain",
markDeviceUncertainErr.Error()))
}
return volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -635,7 +659,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
err = fmt.Errorf(
"verify that your node machine has the required components before attempting to mount this volume type. %s",
canMountErr)
return volumeToMount.GenerateError("MountVolume.CanMount failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.CanMount failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -661,7 +686,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
og.checkForFailedMount(volumeToMount, mountErr)
og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld)
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
_, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "")
@ -681,17 +707,19 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
_, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
eventRecorderFunc := func(err *error) {
@ -778,20 +806,24 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
}
unmountVolumeFunc := func() (error, error) {
unmountVolumeFunc := func() volumetypes.OperationContext {
subpather := og.volumePluginMgr.Host.GetSubpather()
migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
// Remove all bind-mounts for subPaths
podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
return volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Execute unmount
unmountErr := volumeUnmounter.TearDown()
if unmountErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(
@ -812,7 +844,7 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error())
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
@ -844,14 +876,18 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err)
}
unmountDeviceFunc := func() (error, error) {
unmountDeviceFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
//deviceMountPath := deviceToDetach.DeviceMountPath
deviceMountPath, err :=
volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec)
if err != nil {
// On failure other than "does not exist", return error. Caller will log and retry.
if !strings.Contains(err.Error(), "does not exist") {
return deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// If the mount path could not be found, don't fail the unmount, but instead log a warning and proceed,
// using the value from deviceToDetach.DeviceMountPath, so that the device can be marked as unmounted
@ -865,13 +901,15 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
if err == nil {
err = fmt.Errorf("the device mount path %q is still mounted by other references %v", deviceMountPath, refs)
}
return deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err)
eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Execute unmount
unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath)
if unmountDeviceErr != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Before logging that UnmountDevice succeeded and moving on,
// use hostutil.PathIsDevice to check if the path is a device,
@ -879,13 +917,14 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
// else on the system. Retry if it returns true.
deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
if deviceOpenedErr != nil {
return nil, deviceOpenedErr
return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
}
// The device is still in use elsewhere. Caller will log and retry.
if deviceOpened {
return deviceToDetach.GenerateError(
eventErr, detailedErr := deviceToDetach.GenerateError(
"UnmountDevice failed",
goerrors.New("the device is in use when it was no longer expected to be in use"))
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", ""))
@ -895,10 +934,11 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
deviceToDetach.VolumeName)
if markDeviceUnmountedErr != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
@ -958,15 +998,19 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
}
mapVolumeFunc := func() (simpleErr error, detailedErr error) {
mapVolumeFunc := func() (operationContext volumetypes.OperationContext) {
var devicePath string
var stagingPath string
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
// Set up global map path under the given plugin directory using symbolic link
globalMapPath, err :=
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
if volumeAttacher != nil {
// Wait for attachable volumes to finish attaching
@ -976,7 +1020,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)))
@ -989,7 +1034,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
if mapErr != nil {
og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld)
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -999,7 +1045,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
volumeToMount.VolumeName, markedDevicePath, globalMapPath)
if markDeviceMappedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
markVolumeOpts := MarkVolumeOpts{
@ -1020,14 +1067,15 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
if mapErr != nil {
// On failure, return error. Caller will log and retry.
og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld)
return volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// From now on, the volume is mapped. Mark it as uncertain on error,
// so it is is unmapped when corresponding pod is deleted.
defer func() {
if simpleErr != nil {
errText := simpleErr.Error()
if operationContext.EventErr != nil {
errText := operationContext.EventErr.Error()
og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld)
}
}()
@ -1038,7 +1086,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
devicePath = pluginDevicePath
}
if len(devicePath) == 0 {
return volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty"))
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty"))
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -1048,12 +1097,14 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
// AttachFileDevice will fail. If kubelet is not containerized, eval it anyway.
kvh, ok := og.GetVolumePluginMgr().Host.(volume.KubeletVolumeHost)
if !ok {
return volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface"))
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface"))
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
hu := kvh.GetHostUtil()
devicePath, err = hu.EvalHostSymlinks(devicePath)
if err != nil {
return volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Update actual state of world with the devicePath again, if devicePath has changed from markedDevicePath
@ -1063,7 +1114,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
volumeToMount.VolumeName, devicePath, globalMapPath)
if markDeviceMappedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -1072,7 +1124,8 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
mapErr := util.MapBlockVolume(og.blkUtil, devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID)
if mapErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Device mapping for global map path succeeded
@ -1095,16 +1148,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
_, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if markVolMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
eventRecorderFunc := func(err *error) {
@ -1144,7 +1199,10 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr)
}
unmapVolumeFunc := func() (error, error) {
unmapVolumeFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
// pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath()
// plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
@ -1154,7 +1212,8 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
if unmapErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
@ -1163,7 +1222,8 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
if unmapErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -1185,7 +1245,7 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error())
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
@ -1228,7 +1288,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
}
unmapDeviceFunc := func() (error, error) {
unmapDeviceFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
// Search under globalMapPath dir if all symbolic links from pods have been removed already.
// If symbolic links are there, pods may still refer the volume.
globalMapPath := deviceToDetach.DeviceMountPath
@ -1238,12 +1299,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
// Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted.
refs = nil
} else {
return deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err)
eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
if len(refs) > 0 {
err = fmt.Errorf("the device %q is still referenced from other Pods %v", globalMapPath, refs)
return deviceToDetach.GenerateError("UnmapDevice failed", err)
eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
@ -1252,7 +1315,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
unmapErr := customBlockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath)
if unmapErr != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr)
eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
}
@ -1261,7 +1325,8 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath)
if removeMapPathErr != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr)
eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Before logging that UnmapDevice succeeded and moving on,
@ -1270,13 +1335,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
// else on the system. Retry if it returns true.
deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
if deviceOpenedErr != nil {
return nil, deviceOpenedErr
return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
}
// The device is still in use elsewhere. Caller will log and retry.
if deviceOpened {
return deviceToDetach.GenerateError(
eventErr, detailedErr := deviceToDetach.GenerateError(
"UnmapDevice failed",
fmt.Errorf("the device is in use when it was no longer expected to be in use"))
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", ""))
@ -1286,10 +1352,11 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
deviceToDetach.VolumeName)
if markDeviceUnmountedErr != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
return volumetypes.GeneratedOperations{
@ -1310,7 +1377,8 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
}
verifyControllerAttachedVolumeFunc := func() (error, error) {
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
if !volumeToMount.PluginIsAttachable {
// If the volume does not implement the attacher interface, it is
// assumed to be attached and the actual state of the world is
@ -1320,10 +1388,11 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
if !volumeToMount.ReportedInUse {
@ -1333,21 +1402,24 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
// periodically by kubelet, so it may take as much as 10 seconds
// before this clears.
// Issue #28141 to enable on demand status updates.
return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
eventErr, detailedErr := volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// Fetch current node object
node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{})
if fetchErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
if node == nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError(
eventErr, detailedErr := volumeToMount.GenerateError(
"VerifyControllerAttachedVolume failed",
fmt.Errorf("node object retrieved from API server is nil"))
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
for _, attachedVolume := range node.Status.VolumesAttached {
@ -1357,14 +1429,16 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)))
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
}
// Volume not attached, return error. Caller will log and retry.
return volumeToMount.GenerateError("Volume not attached according to node status", nil)
eventErr, detailedErr := volumeToMount.GenerateError("Volume not attached according to node status", nil)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
return volumetypes.GeneratedOperations{
@ -1425,7 +1499,10 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
}
expandVolumeFunc := func() (error, error) {
expandVolumeFunc := func() volumetypes.OperationContext {
migrated := false
newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
statusSize := pvc.Status.Capacity[v1.ResourceStorage]
pvSize := pv.Spec.Capacity[v1.ResourceStorage]
@ -1436,7 +1513,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
statusSize)
if expandErr != nil {
detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr)
return detailedErr, detailedErr
return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
}
klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
@ -1448,7 +1525,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
if updateErr != nil {
detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
return detailedErr, detailedErr
return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
}
klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
@ -1463,7 +1540,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return detailedErr, detailedErr
return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
}
successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
@ -1472,10 +1549,10 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
if err != nil {
detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
klog.Warning(detailedErr)
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
}
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
eventRecorderFunc := func(err *error) {
@ -1502,16 +1579,19 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err)
}
fsResizeFunc := func() (error, error) {
fsResizeFunc := func() volumetypes.OperationContext {
var resizeDone bool
var simpleErr, detailedErr error
var eventErr, detailedErr error
migrated := false
resizeOptions := volume.NodeResizeOptions{
VolumeSpec: volumeToMount.VolumeSpec,
DevicePath: volumeToMount.DevicePath,
}
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err)
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
if fsVolume {
@ -1520,7 +1600,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
@ -1534,7 +1615,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
if volumeDeviceMounter != nil {
deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
resizeOptions.DeviceStagePath = deviceStagePath
}
@ -1543,11 +1625,13 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
blockVolumePlugin, err :=
og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err)
eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
if blockVolumePlugin == nil {
return volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
@ -1555,7 +1639,8 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount.Pod,
volume.VolumeOptions{})
if newMapperErr != nil {
return volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
// if plugin supports custom mappers lets add DeviceStagePath
@ -1566,16 +1651,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
// if we are doing online expansion then volume is already published
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr
resizeDone, eventErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if eventErr != nil || detailedErr != nil {
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
if resizeDone {
return nil, nil
return volumetypes.NewOperationContext(nil, nil, migrated)
}
// This is a placeholder error - we should NEVER reach here.
err = fmt.Errorf("volume resizing failed for unknown reason")
return volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err)
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
eventRecorderFunc := func(err *error) {
@ -1762,3 +1848,11 @@ func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) (
}
return nil, nil
}
func getMigratedStatusBySpec(spec *volume.Spec) bool {
migrated := false
if spec != nil {
migrated = spec.Migrated
}
return migrated
}

View File

@ -17,9 +17,12 @@ limitations under the License.
package operationexecutor
import (
"github.com/prometheus/client_model/go"
"os"
"testing"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
@ -32,8 +35,7 @@ import (
csitesting "k8s.io/kubernetes/pkg/volume/csi/testing"
"k8s.io/kubernetes/pkg/volume/gcepd"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"os"
"testing"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// this method just tests the volume plugin name that's used in CompleteFunc, the same plugin is also used inside the
@ -92,7 +94,7 @@ func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) {
storageOperationStatusCountMetricBefore := findMetricWithNameAndLabels(metricFamilyName, labelFilter)
var ee error
unmapVolumeFunc.CompleteFunc(&ee)
unmapVolumeFunc.CompleteFunc(volumetypes.CompleteFuncParam{Err: &ee})
storageOperationStatusCountMetricAfter := findMetricWithNameAndLabels(metricFamilyName, labelFilter)
if storageOperationStatusCountMetricAfter == nil {

View File

@ -36,22 +36,49 @@ type UniquePVCName types.UID
type GeneratedOperations struct {
// Name of operation - could be used for resetting shared exponential backoff
OperationName string
OperationFunc func() (eventErr error, detailedErr error)
OperationFunc func() (context OperationContext)
EventRecorderFunc func(*error)
CompleteFunc func(*error)
CompleteFunc func(CompleteFuncParam)
}
type OperationContext struct {
EventErr error
DetailedErr error
Migrated bool
}
func NewOperationContext(eventErr, detailedErr error, migrated bool) OperationContext {
return OperationContext{
EventErr: eventErr,
DetailedErr: detailedErr,
Migrated: migrated,
}
}
type CompleteFuncParam struct {
Err *error
Migrated *bool
}
// Run executes the operations and its supporting functions
func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
var context OperationContext
if o.CompleteFunc != nil {
defer o.CompleteFunc(&detailedErr)
c := CompleteFuncParam{
Err: &context.DetailedErr,
Migrated: &context.Migrated,
}
c.Err = &detailedErr
defer o.CompleteFunc(c)
}
if o.EventRecorderFunc != nil {
defer o.EventRecorderFunc(&eventErr)
}
// Handle panic, if any, from operationFunc()
defer runtime.RecoverFromPanic(&detailedErr)
return o.OperationFunc()
context = o.OperationFunc()
return context.EventErr, context.DetailedErr
}
// FailedPrecondition error indicates CSI operation returned failed precondition

View File

@ -29,6 +29,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/util/types"
)
const (
@ -40,7 +41,7 @@ const (
// SetVolumeOwnership modifies the given volume to be owned by
// fsGroup, and sets SetGid so that newly created files are owned by
// fsGroup. If fsGroup is nil nothing is done.
func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1.PodFSGroupChangePolicy, completeFunc func(*error)) error {
func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1.PodFSGroupChangePolicy, completeFunc func(types.CompleteFuncParam)) error {
if fsGroup == nil {
return nil
}
@ -57,7 +58,9 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1
if !fsGroupPolicyEnabled {
err := legacyOwnershipChange(mounter, fsGroup)
if completeFunc != nil {
completeFunc(&err)
completeFunc(types.CompleteFuncParam{
Err: &err,
})
}
return err
}
@ -74,7 +77,9 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64, fsGroupChangePolicy *v1
return changeFilePermission(path, fsGroup, mounter.GetAttributes().ReadOnly, info)
})
if completeFunc != nil {
completeFunc(&err)
completeFunc(types.CompleteFuncParam{
Err: &err,
})
}
return err
}