mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Refactor volume attach code
This commit is contained in:
parent
6eea80ec97
commit
10f91a9951
@ -655,6 +655,19 @@ func (asw *actualStateOfWorld) SetDeviceMountState(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) {
|
||||||
|
asw.Lock()
|
||||||
|
defer asw.Unlock()
|
||||||
|
|
||||||
|
volumeObj, ok := asw.attachedVolumes[volumeName]
|
||||||
|
// only set volume claim size if claimStatusSize is zero
|
||||||
|
// this can happen when volume was rebuilt after kubelet startup
|
||||||
|
if ok && volumeObj.persistentVolumeSize.IsZero() {
|
||||||
|
volumeObj.persistentVolumeSize = claimSize
|
||||||
|
asw.attachedVolumes[volumeName] = volumeObj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (asw *actualStateOfWorld) DeletePodFromVolume(
|
func (asw *actualStateOfWorld) DeletePodFromVolume(
|
||||||
podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error {
|
podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error {
|
||||||
asw.Lock()
|
asw.Lock()
|
||||||
|
@ -169,7 +169,7 @@ func (rc *reconciler) reconcile() {
|
|||||||
// attach if kubelet is responsible for attaching volumes.
|
// attach if kubelet is responsible for attaching volumes.
|
||||||
// If underlying PVC was resized while in-use then this function also handles volume
|
// If underlying PVC was resized while in-use then this function also handles volume
|
||||||
// resizing.
|
// resizing.
|
||||||
rc.mountAttachVolumes()
|
rc.mountOrAttachVolumes()
|
||||||
|
|
||||||
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
||||||
rc.unmountDetachDevices()
|
rc.unmountDetachDevices()
|
||||||
@ -193,82 +193,94 @@ func (rc *reconciler) unmountVolumes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *reconciler) mountAttachVolumes() {
|
func (rc *reconciler) mountOrAttachVolumes() {
|
||||||
// Ensure volumes that should be attached/mounted are attached/mounted.
|
// Ensure volumes that should be attached/mounted are attached/mounted.
|
||||||
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
|
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
|
||||||
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||||
volumeToMount.DevicePath = devicePath
|
volumeToMount.DevicePath = devicePath
|
||||||
if cache.IsVolumeNotAttachedError(err) {
|
if cache.IsVolumeNotAttachedError(err) {
|
||||||
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
rc.waitForVolumeAttach(volumeToMount)
|
||||||
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
|
|
||||||
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
|
|
||||||
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
|
||||||
// for controller to finish attaching volume.
|
|
||||||
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
err := rc.operationExecutor.VerifyControllerAttachedVolume(
|
|
||||||
volumeToMount.VolumeToMount,
|
|
||||||
rc.nodeName,
|
|
||||||
rc.actualStateOfWorld)
|
|
||||||
if err != nil && !isExpectedError(err) {
|
|
||||||
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
|
|
||||||
// so attach it
|
|
||||||
volumeToAttach := operationexecutor.VolumeToAttach{
|
|
||||||
VolumeName: volumeToMount.VolumeName,
|
|
||||||
VolumeSpec: volumeToMount.VolumeSpec,
|
|
||||||
NodeName: rc.nodeName,
|
|
||||||
}
|
|
||||||
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
|
|
||||||
if err != nil && !isExpectedError(err) {
|
|
||||||
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if !volMounted || cache.IsRemountRequiredError(err) {
|
} else if !volMounted || cache.IsRemountRequiredError(err) {
|
||||||
// Volume is not mounted, or is already mounted, but requires remounting
|
rc.mountAttachedVolumes(volumeToMount, err)
|
||||||
remountingLogStr := ""
|
|
||||||
isRemount := cache.IsRemountRequiredError(err)
|
|
||||||
if isRemount {
|
|
||||||
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
|
|
||||||
}
|
|
||||||
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
err := rc.operationExecutor.MountVolume(
|
|
||||||
rc.waitForAttachTimeout,
|
|
||||||
volumeToMount.VolumeToMount,
|
|
||||||
rc.actualStateOfWorld,
|
|
||||||
isRemount)
|
|
||||||
if err != nil && !isExpectedError(err) {
|
|
||||||
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
if remountingLogStr == "" {
|
|
||||||
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
} else {
|
|
||||||
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if cache.IsFSResizeRequiredError(err) {
|
} else if cache.IsFSResizeRequiredError(err) {
|
||||||
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
rc.expandVolume(volumeToMount)
|
||||||
err := rc.operationExecutor.ExpandInUseVolume(
|
}
|
||||||
volumeToMount.VolumeToMount,
|
}
|
||||||
rc.actualStateOfWorld)
|
}
|
||||||
if err != nil && !isExpectedError(err) {
|
|
||||||
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount) {
|
||||||
}
|
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
if err == nil {
|
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld)
|
||||||
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
|
||||||
}
|
if err != nil && !isExpectedError(err) {
|
||||||
|
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
|
||||||
|
// Volume is not mounted, or is already mounted, but requires remounting
|
||||||
|
remountingLogStr := ""
|
||||||
|
isRemount := cache.IsRemountRequiredError(podExistError)
|
||||||
|
if isRemount {
|
||||||
|
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
|
||||||
|
}
|
||||||
|
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
err := rc.operationExecutor.MountVolume(
|
||||||
|
rc.waitForAttachTimeout,
|
||||||
|
volumeToMount.VolumeToMount,
|
||||||
|
rc.actualStateOfWorld,
|
||||||
|
isRemount)
|
||||||
|
if err != nil && !isExpectedError(err) {
|
||||||
|
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
if remountingLogStr == "" {
|
||||||
|
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
} else {
|
||||||
|
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
|
||||||
|
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
||||||
|
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
|
||||||
|
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
|
||||||
|
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
||||||
|
// for controller to finish attaching volume.
|
||||||
|
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
err := rc.operationExecutor.VerifyControllerAttachedVolume(
|
||||||
|
volumeToMount.VolumeToMount,
|
||||||
|
rc.nodeName,
|
||||||
|
rc.actualStateOfWorld)
|
||||||
|
if err != nil && !isExpectedError(err) {
|
||||||
|
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
|
||||||
|
// so attach it
|
||||||
|
volumeToAttach := operationexecutor.VolumeToAttach{
|
||||||
|
VolumeName: volumeToMount.VolumeName,
|
||||||
|
VolumeSpec: volumeToMount.VolumeSpec,
|
||||||
|
NodeName: rc.nodeName,
|
||||||
|
}
|
||||||
|
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
|
||||||
|
if err != nil && !isExpectedError(err) {
|
||||||
|
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,6 +245,9 @@ type ActualStateOfWorldAttacherUpdater interface {
|
|||||||
// Unmarks the desire to detach for the specified volume (add the volume back to
|
// Unmarks the desire to detach for the specified volume (add the volume back to
|
||||||
// the node's volumesToReportAsAttached list)
|
// the node's volumesToReportAsAttached list)
|
||||||
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
|
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
|
||||||
|
|
||||||
|
// SetVolumeClaimSize sets pvc claim size by reading pvc.Status.Capacity
|
||||||
|
SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity)
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumeLogger defines a set of operations for generating volume-related logging and error msgs
|
// VolumeLogger defines a set of operations for generating volume-related logging and error msgs
|
||||||
|
@ -1491,6 +1491,21 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
|||||||
|
|
||||||
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
|
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
|
||||||
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
|
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
|
||||||
|
var claimSize resource.Quantity
|
||||||
|
|
||||||
|
if volumeToMount.VolumeSpec.PersistentVolume != nil {
|
||||||
|
pv := volumeToMount.VolumeSpec.PersistentVolume
|
||||||
|
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err)
|
||||||
|
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||||
|
}
|
||||||
|
pvcStatusSize := pvc.Status.Capacity.Storage()
|
||||||
|
if pvcStatusSize != nil {
|
||||||
|
claimSize = *pvcStatusSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !volumeToMount.PluginIsAttachable {
|
if !volumeToMount.PluginIsAttachable {
|
||||||
// If the volume does not implement the attacher interface, it is
|
// If the volume does not implement the attacher interface, it is
|
||||||
// assumed to be attached and the actual state of the world is
|
// assumed to be attached and the actual state of the world is
|
||||||
@ -1503,7 +1518,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
|||||||
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
|
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
|
||||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||||
}
|
}
|
||||||
|
actualStateOfWorld.SetVolumeClaimSize(volumeToMount.VolumeName, claimSize)
|
||||||
return volumetypes.NewOperationContext(nil, nil, migrated)
|
return volumetypes.NewOperationContext(nil, nil, migrated)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1544,6 +1559,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
|||||||
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
|
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
|
||||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||||
}
|
}
|
||||||
|
actualStateOfWorld.SetVolumeClaimSize(volumeToMount.VolumeName, claimSize)
|
||||||
return volumetypes.NewOperationContext(nil, nil, migrated)
|
return volumetypes.NewOperationContext(nil, nil, migrated)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user