mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Fix issue #34242: Attach/detach should recover from a crash
When the attach/detach controller crashes and a pod with attached PV is deleted afterwards the controller will never detach the pod's attached volumes. To prevent this the controller should try to recover the state from the nodes status.
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
@@ -102,12 +103,16 @@ func NewAttachDetachController(
|
||||
// dropped pods so they are continuously processed until it is accepted or
|
||||
// deleted (probably can't do this with sharedInformer), etc.
|
||||
adc := &attachDetachController{
|
||||
kubeClient: kubeClient,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvsSynced: pvInformer.Informer().HasSynced,
|
||||
cloud: cloud,
|
||||
kubeClient: kubeClient,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvsSynced: pvInformer.Informer().HasSynced,
|
||||
podLister: podInformer.Lister(),
|
||||
podsSynced: podInformer.Informer().HasSynced,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||
cloud: cloud,
|
||||
}
|
||||
|
||||
if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
|
||||
@@ -155,14 +160,12 @@ func NewAttachDetachController(
|
||||
UpdateFunc: adc.podUpdate,
|
||||
DeleteFunc: adc.podDelete,
|
||||
})
|
||||
adc.podsSynced = podInformer.Informer().HasSynced
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||
AddFunc: adc.nodeAdd,
|
||||
UpdateFunc: adc.nodeUpdate,
|
||||
DeleteFunc: adc.nodeDelete,
|
||||
})
|
||||
adc.nodesSynced = nodeInformer.Informer().HasSynced
|
||||
|
||||
return adc, nil
|
||||
}
|
||||
@@ -184,7 +187,10 @@ type attachDetachController struct {
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
pvsSynced kcache.InformerSynced
|
||||
|
||||
podsSynced kcache.InformerSynced
|
||||
podLister corelisters.PodLister
|
||||
podsSynced kcache.InformerSynced
|
||||
|
||||
nodeLister corelisters.NodeLister
|
||||
nodesSynced kcache.InformerSynced
|
||||
|
||||
// cloud provider used by volume host
|
||||
@@ -239,12 +245,136 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
err := adc.populateActualStateOfWorld()
|
||||
if err != nil {
|
||||
glog.Errorf("Error populating the actual state of world: %v", err)
|
||||
}
|
||||
err = adc.populateDesiredStateOfWorld()
|
||||
if err != nil {
|
||||
glog.Errorf("Error populating the desired state of world: %v", err)
|
||||
}
|
||||
go adc.reconciler.Run(stopCh)
|
||||
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) populateActualStateOfWorld() error {
|
||||
glog.V(5).Infof("Populating ActualStateOfworld")
|
||||
nodes, err := adc.nodeLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
nodeName := types.NodeName(node.Name)
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
uniqueName := attachedVolume.Name
|
||||
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
|
||||
// In such a case it should be detached in the first reconciliation cycle and the
|
||||
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
|
||||
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
|
||||
// scans the pods and updates their volumes in the ActualStateOfWorld too.
|
||||
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to mark the volume as attached: %v", err)
|
||||
continue
|
||||
}
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, true /* forceUnmount */)
|
||||
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
|
||||
// Node specifies annotation indicating it should be managed by
|
||||
// attach detach controller. Add it to desired state of world.
|
||||
adc.desiredStateOfWorld.AddNode(types.NodeName(node.Name)) // Needed for DesiredStateOfWorld population
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) getNodeVolumeDevicePath(
|
||||
volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
|
||||
var devicePath string
|
||||
var found bool
|
||||
node, err := adc.nodeLister.Get(string(nodeName))
|
||||
if err != nil {
|
||||
return devicePath, err
|
||||
}
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
if volumeName == attachedVolume.Name {
|
||||
devicePath = attachedVolume.DevicePath
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
|
||||
}
|
||||
|
||||
return devicePath, err
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) populateDesiredStateOfWorld() error {
|
||||
glog.V(5).Infof("Populating DesiredStateOfworld")
|
||||
|
||||
pods, err := adc.podLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pod := range pods {
|
||||
podToAdd := pod
|
||||
adc.podAdd(&podToAdd)
|
||||
for _, podVolume := range podToAdd.Spec.Volumes {
|
||||
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
|
||||
// with the correct ones found on pods. The present in the ASW with no corresponding
|
||||
// pod will be detached and the spec is irrelevant.
|
||||
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
|
||||
if err != nil {
|
||||
glog.Errorf(
|
||||
"Error creating spec for volume %q, pod %q/%q: %v",
|
||||
podVolume.Name,
|
||||
podToAdd.Namespace,
|
||||
podToAdd.Name,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
nodeName := types.NodeName(podToAdd.Spec.NodeName)
|
||||
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
|
||||
if err != nil || plugin == nil {
|
||||
glog.V(10).Infof(
|
||||
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
|
||||
podVolume.Name,
|
||||
podToAdd.Namespace,
|
||||
podToAdd.Name,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
|
||||
if err != nil {
|
||||
glog.Errorf(
|
||||
"Failed to find unique name for volume %q, pod %q/%q: %v",
|
||||
podVolume.Name,
|
||||
podToAdd.Namespace,
|
||||
podToAdd.Name,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) {
|
||||
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to find device path: %v", err)
|
||||
continue
|
||||
}
|
||||
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) podAdd(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if pod == nil || !ok {
|
||||
@@ -308,7 +438,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
|
||||
// detach controller. Add it to desired state of world.
|
||||
adc.desiredStateOfWorld.AddNode(nodeName)
|
||||
}
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
||||
@@ -322,7 +452,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
||||
glog.V(10).Infof("%v", err)
|
||||
}
|
||||
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
|
||||
}
|
||||
|
||||
// processVolumesInUse processes the list of volumes marked as "in-use"
|
||||
@@ -330,7 +460,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
||||
// corresponding volume in the actual state of the world to indicate that it is
|
||||
// mounted.
|
||||
func (adc *attachDetachController) processVolumesInUse(
|
||||
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
|
||||
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) {
|
||||
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
|
||||
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
|
||||
mounted := false
|
||||
@@ -341,7 +471,7 @@ func (adc *attachDetachController) processVolumesInUse(
|
||||
}
|
||||
}
|
||||
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
|
||||
attachedVolume.VolumeName, nodeName, mounted)
|
||||
attachedVolume.VolumeName, nodeName, mounted, forceUnmount)
|
||||
if err != nil {
|
||||
glog.Warningf(
|
||||
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
|
||||
|
||||
Reference in New Issue
Block a user