mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-29 21:29:24 +00:00
Introduce new kubelet volume manager
This commit adds a new volume manager in kubelet that synchronizes volume mount/unmount (and attach/detach, if attach/detach controller is not enabled). This eliminates the race conditions between the pod creation loop and the orphaned volumes loops. It also removes the unmount/detach from the `syncPod()` path so volume clean up never blocks the `syncPod` loop.
This commit is contained in:
@@ -23,13 +23,16 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attacherdetacher"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/cache"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
|
||||
// Reconciler runs a periodic loop to reconcile the desired state of the with
|
||||
// the actual state of the world by triggering attach detach operations.
|
||||
// Note: This is distinct from the Reconciler implemented by the kubelet volume
|
||||
// manager. This reconciles state for the attach/detach controller. That
|
||||
// reconciles state for the kubelet volume manager.
|
||||
type Reconciler interface {
|
||||
// Starts running the reconciliation loop which executes periodically, checks
|
||||
// if volumes that should be attached are attached and volumes that should
|
||||
@@ -52,7 +55,7 @@ func NewReconciler(
|
||||
maxWaitForUnmountDuration time.Duration,
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||
actualStateOfWorld cache.ActualStateOfWorld,
|
||||
attacherDetacher attacherdetacher.AttacherDetacher) Reconciler {
|
||||
attacherDetacher operationexecutor.OperationExecutor) Reconciler {
|
||||
return &reconciler{
|
||||
loopPeriod: loopPeriod,
|
||||
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
|
||||
@@ -67,7 +70,7 @@ type reconciler struct {
|
||||
maxWaitForUnmountDuration time.Duration
|
||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||
actualStateOfWorld cache.ActualStateOfWorld
|
||||
attacherDetacher attacherdetacher.AttacherDetacher
|
||||
attacherDetacher operationexecutor.OperationExecutor
|
||||
}
|
||||
|
||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
|
||||
@@ -86,7 +89,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
|
||||
// Volume exists in actual state of world but not desired
|
||||
if !attachedVolume.MountedByNode {
|
||||
glog.V(5).Infof("Attempting to start DetachVolume for volume %q to node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
err := rc.attacherDetacher.DetachVolume(attachedVolume, rc.actualStateOfWorld)
|
||||
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof("Started DetachVolume for volume %q to node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
}
|
||||
@@ -98,7 +101,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
|
||||
}
|
||||
if timeElapsed > rc.maxWaitForUnmountDuration {
|
||||
glog.V(5).Infof("Attempting to start DetachVolume for volume %q to node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
err := rc.attacherDetacher.DetachVolume(attachedVolume, rc.actualStateOfWorld)
|
||||
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof("Started DetachVolume for volume %q to node %q due to maxWaitForUnmountDuration expiry.", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
}
|
||||
@@ -121,7 +124,7 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
|
||||
} else {
|
||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
err := rc.attacherDetacher.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
|
||||
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
}
|
||||
|
||||
@@ -21,12 +21,12 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attacherdetacher"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/cache"
|
||||
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -38,10 +38,10 @@ const (
|
||||
// Verifies there are no calls to attach or detach.
|
||||
func Test_Run_Positive_DoNothing(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
|
||||
ad := operationexecutor.NewOperationExecutor(volumePluginMgr)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
|
||||
|
||||
@@ -61,13 +61,13 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
||||
// Verifies there is one attach call and no detach calls.
|
||||
func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
|
||||
ad := operationexecutor.NewOperationExecutor(volumePluginMgr)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
|
||||
podName := types.UniquePodName("pod-name")
|
||||
podName := types.UniquePodName("pod-uid")
|
||||
volumeName := api.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
nodeName := "node-name"
|
||||
@@ -102,13 +102,13 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
|
||||
// Verifies there is one detach call and no (new) attach calls.
|
||||
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
|
||||
ad := operationexecutor.NewOperationExecutor(volumePluginMgr)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
|
||||
podName := types.UniquePodName("pod-name")
|
||||
podName := types.UniquePodName("pod-uid")
|
||||
volumeName := api.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
nodeName := "node-name"
|
||||
@@ -164,13 +164,13 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
|
||||
// Verifies there is one detach call and no (new) attach calls.
|
||||
func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := controllervolumetesting.GetTestVolumePluginMgr((t))
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
ad := attacherdetacher.NewAttacherDetacher(volumePluginMgr)
|
||||
ad := operationexecutor.NewOperationExecutor(volumePluginMgr)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad)
|
||||
podName := types.UniquePodName("pod-name")
|
||||
podName := types.UniquePodName("pod-uid")
|
||||
volumeName := api.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
nodeName := "node-name"
|
||||
|
||||
Reference in New Issue
Block a user