mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #96689 from gnufied/fix-unknown-node-dangling-vsphere
Fix dangling volumes from nodes not tracked by attach detach controller
This commit is contained in:
commit
fc43c80ccd
@ -311,6 +311,17 @@ func (nm *NodeManager) GetNodeDetails() ([]NodeDetails, error) {
|
|||||||
return nodeDetails, nil
|
return nodeDetails, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodeNames returns list of nodes that are known to vsphere cloudprovider.
|
||||||
|
// These are typically nodes that make up k8s cluster.
|
||||||
|
func (nm *NodeManager) GetNodeNames() []k8stypes.NodeName {
|
||||||
|
nodes := nm.getNodes()
|
||||||
|
var nodeNameList []k8stypes.NodeName
|
||||||
|
for _, node := range nodes {
|
||||||
|
nodeNameList = append(nodeNameList, k8stypes.NodeName(node.Name))
|
||||||
|
}
|
||||||
|
return nodeNameList
|
||||||
|
}
|
||||||
|
|
||||||
func (nm *NodeManager) refreshNodes() (errList []error) {
|
func (nm *NodeManager) refreshNodes() (errList []error) {
|
||||||
for nodeName := range nm.getNodes() {
|
for nodeName := range nm.getNodes() {
|
||||||
nodeInfo, err := nm.getRefreshedNodeInfo(convertToK8sType(nodeName))
|
nodeInfo, err := nm.getRefreshedNodeInfo(convertToK8sType(nodeName))
|
||||||
|
@ -1221,9 +1221,15 @@ func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
klog.V(4).Infof("DisksAreAttached successfully executed. result: %+v", attached)
|
||||||
|
// There could be nodes in cluster which do not have any pods with vsphere volumes running on them
|
||||||
|
// such nodes won't be part of nodeVolumes map because attach-detach controller does not keep track
|
||||||
|
// such nodes. But such nodes may still have dangling volumes on them and hence we need to scan all the
|
||||||
|
// remaining nodes which weren't scanned by code previously.
|
||||||
|
vs.BuildMissingVolumeNodeMap(ctx)
|
||||||
// any volume which we could not verify will be removed from the map.
|
// any volume which we could not verify will be removed from the map.
|
||||||
vs.vsphereVolumeMap.RemoveUnverified()
|
vs.vsphereVolumeMap.RemoveUnverified()
|
||||||
klog.V(4).Infof("DisksAreAttach successfully executed. result: %+v", attached)
|
klog.V(4).Infof("current node volume map is: %+v", vs.vsphereVolumeMap.volumeNodeMap)
|
||||||
return disksAttached, nil
|
return disksAttached, nil
|
||||||
}
|
}
|
||||||
requestTime := time.Now()
|
requestTime := time.Now()
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/vmware/govmomi/find"
|
"github.com/vmware/govmomi/find"
|
||||||
@ -608,6 +609,123 @@ func (vs *VSphere) checkDiskAttached(ctx context.Context, nodes []k8stypes.NodeN
|
|||||||
return nodesToRetry, nil
|
return nodesToRetry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BuildMissingVolumeNodeMap builds a map of volumes and nodes which are not known to attach detach controller.
|
||||||
|
// There could be nodes in cluster which do not have any pods with vsphere volumes running on them
|
||||||
|
// such nodes won't be part of disk verification check because attach-detach controller does not keep track
|
||||||
|
// such nodes. But such nodes may still have dangling volumes on them and hence we need to scan all the
|
||||||
|
// remaining nodes which weren't scanned by code previously.
|
||||||
|
func (vs *VSphere) BuildMissingVolumeNodeMap(ctx context.Context) {
|
||||||
|
nodeNames := vs.nodeManager.GetNodeNames()
|
||||||
|
// Segregate nodes according to VC-DC
|
||||||
|
dcNodes := make(map[string][]k8stypes.NodeName)
|
||||||
|
|
||||||
|
for _, nodeName := range nodeNames {
|
||||||
|
// if given node is not in node volume map
|
||||||
|
if !vs.vsphereVolumeMap.CheckForNode(nodeName) {
|
||||||
|
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
klog.V(4).Infof("Failed to get node info: %+v. err: %+v", nodeInfo.vm, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
vcDC := nodeInfo.vcServer + nodeInfo.dataCenter.String()
|
||||||
|
dcNodes[vcDC] = append(dcNodes[vcDC], nodeName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, nodeNames := range dcNodes {
|
||||||
|
// Start go routines per VC-DC to check disks are attached
|
||||||
|
wg.Add(1)
|
||||||
|
go func(nodes []k8stypes.NodeName) {
|
||||||
|
err := vs.checkNodeDisks(ctx, nodeNames)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to check disk attached for nodes: %+v. err: %+v", nodes, err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(nodeNames)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *VSphere) checkNodeDisks(ctx context.Context, nodeNames []k8stypes.NodeName) error {
|
||||||
|
var vmList []*vclib.VirtualMachine
|
||||||
|
var nodeInfo NodeInfo
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for _, nodeName := range nodeNames {
|
||||||
|
nodeInfo, err = vs.nodeManager.GetNodeInfo(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
vmList = append(vmList, nodeInfo.vm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Making sure session is valid
|
||||||
|
_, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any of the nodes are not present property collector query will fail for entire operation
|
||||||
|
vmMoList, err := nodeInfo.dataCenter.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name", "config.uuid"})
|
||||||
|
if err != nil {
|
||||||
|
if vclib.IsManagedObjectNotFoundError(err) {
|
||||||
|
klog.V(4).Infof("checkNodeDisks: ManagedObjectNotFound for property collector query for nodes: %+v vms: %+v", nodeNames, vmList)
|
||||||
|
// Property Collector Query failed
|
||||||
|
// VerifyVolumePaths per VM
|
||||||
|
for _, nodeName := range nodeNames {
|
||||||
|
nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
devices, err := nodeInfo.vm.VirtualMachine.Device(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if vclib.IsManagedObjectNotFoundError(err) {
|
||||||
|
klog.V(4).Infof("checkNodeDisks: ManagedObjectNotFound for Kubernetes node: %s with vSphere Virtual Machine reference: %v", nodeName, nodeInfo.vm)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
klog.V(4).Infof("Verifying Volume Paths by devices for node %s and VM %s", nodeName, nodeInfo.vm)
|
||||||
|
vs.vsphereVolumeMap.Add(nodeName, devices)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
vmMoMap := make(map[string]mo.VirtualMachine)
|
||||||
|
for _, vmMo := range vmMoList {
|
||||||
|
if vmMo.Config == nil {
|
||||||
|
klog.Errorf("Config is not available for VM: %q", vmMo.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
klog.V(9).Infof("vmMoMap vmname: %q vmuuid: %s", vmMo.Name, strings.ToLower(vmMo.Config.Uuid))
|
||||||
|
vmMoMap[strings.ToLower(vmMo.Config.Uuid)] = vmMo
|
||||||
|
}
|
||||||
|
|
||||||
|
klog.V(9).Infof("vmMoMap: +%v", vmMoMap)
|
||||||
|
|
||||||
|
for _, nodeName := range nodeNames {
|
||||||
|
node, err := vs.nodeManager.GetNode(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nodeUUID, err := GetNodeUUID(&node)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Node Discovery failed to get node uuid for node %s with error: %v", node.Name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nodeUUID = strings.ToLower(nodeUUID)
|
||||||
|
klog.V(9).Infof("Verifying volume for node %s with nodeuuid %q: %v", nodeName, nodeUUID, vmMoMap)
|
||||||
|
vmMo := vmMoMap[nodeUUID]
|
||||||
|
vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device)
|
||||||
|
vs.vsphereVolumeMap.Add(nodeName, vmDevices)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (vs *VSphere) GetNodeNameFromProviderID(providerID string) (string, error) {
|
func (vs *VSphere) GetNodeNameFromProviderID(providerID string) (string, error) {
|
||||||
var nodeName string
|
var nodeName string
|
||||||
nodes, err := vs.nodeManager.GetNodeDetails()
|
nodes, err := vs.nodeManager.GetNodeDetails()
|
||||||
|
@ -36,12 +36,14 @@ type nodeVolumeStatus struct {
|
|||||||
// VsphereVolumeMap stores last known state of node and volume mapping
|
// VsphereVolumeMap stores last known state of node and volume mapping
|
||||||
type VsphereVolumeMap struct {
|
type VsphereVolumeMap struct {
|
||||||
volumeNodeMap map[volumePath]nodeVolumeStatus
|
volumeNodeMap map[volumePath]nodeVolumeStatus
|
||||||
|
nodeMap map[k8stypes.NodeName]bool
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVsphereVolumeMap() *VsphereVolumeMap {
|
func NewVsphereVolumeMap() *VsphereVolumeMap {
|
||||||
return &VsphereVolumeMap{
|
return &VsphereVolumeMap{
|
||||||
volumeNodeMap: map[volumePath]nodeVolumeStatus{},
|
volumeNodeMap: map[volumePath]nodeVolumeStatus{},
|
||||||
|
nodeMap: map[k8stypes.NodeName]bool{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,6 +56,9 @@ func (vsphereVolume *VsphereVolumeMap) StartDiskVerification() {
|
|||||||
v.verified = false
|
v.verified = false
|
||||||
vsphereVolume.volumeNodeMap[k] = v
|
vsphereVolume.volumeNodeMap[k] = v
|
||||||
}
|
}
|
||||||
|
// reset nodeMap to empty so that any node we could not verify via usual verification process
|
||||||
|
// can still be verified.
|
||||||
|
vsphereVolume.nodeMap = map[k8stypes.NodeName]bool{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckForVolume verifies if disk is attached to some node in the cluster.
|
// CheckForVolume verifies if disk is attached to some node in the cluster.
|
||||||
@ -69,6 +74,16 @@ func (vsphereVolume *VsphereVolumeMap) CheckForVolume(path string) (k8stypes.Nod
|
|||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckForNode returns true if given node has already been processed by volume
|
||||||
|
// verification mechanism. This is used to skip verifying attached disks on nodes
|
||||||
|
// which were previously verified.
|
||||||
|
func (vsphereVolume *VsphereVolumeMap) CheckForNode(nodeName k8stypes.NodeName) bool {
|
||||||
|
vsphereVolume.lock.RLock()
|
||||||
|
defer vsphereVolume.lock.RUnlock()
|
||||||
|
_, ok := vsphereVolume.nodeMap[nodeName]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
// Add all devices found on a node to the device map
|
// Add all devices found on a node to the device map
|
||||||
func (vsphereVolume *VsphereVolumeMap) Add(node k8stypes.NodeName, vmDevices object.VirtualDeviceList) {
|
func (vsphereVolume *VsphereVolumeMap) Add(node k8stypes.NodeName, vmDevices object.VirtualDeviceList) {
|
||||||
vsphereVolume.lock.Lock()
|
vsphereVolume.lock.Lock()
|
||||||
@ -79,6 +94,7 @@ func (vsphereVolume *VsphereVolumeMap) Add(node k8stypes.NodeName, vmDevices obj
|
|||||||
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
|
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
|
||||||
filename := volumePath(backing.FileName)
|
filename := volumePath(backing.FileName)
|
||||||
vsphereVolume.volumeNodeMap[filename] = nodeVolumeStatus{node, true}
|
vsphereVolume.volumeNodeMap[filename] = nodeVolumeStatus{node, true}
|
||||||
|
vsphereVolume.nodeMap[node] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,6 +107,7 @@ func (vsphereVolume *VsphereVolumeMap) RemoveUnverified() {
|
|||||||
for k, v := range vsphereVolume.volumeNodeMap {
|
for k, v := range vsphereVolume.volumeNodeMap {
|
||||||
if !v.verified {
|
if !v.verified {
|
||||||
delete(vsphereVolume.volumeNodeMap, k)
|
delete(vsphereVolume.volumeNodeMap, k)
|
||||||
|
delete(vsphereVolume.nodeMap, v.nodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,31 +31,63 @@ func TestVsphereVolumeMap(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
deviceToAdd object.VirtualDeviceList
|
deviceToAdd object.VirtualDeviceList
|
||||||
nodeToAdd k8stypes.NodeName
|
nodeToAdd k8stypes.NodeName
|
||||||
volumeToCheck string
|
checkRunner func(volumeMap *VsphereVolumeMap)
|
||||||
runVerification bool
|
|
||||||
expectInMap bool
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "adding new volume",
|
name: "adding new volume",
|
||||||
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
||||||
nodeToAdd: convertToK8sType("node1.lan"),
|
nodeToAdd: convertToK8sType("node1.lan"),
|
||||||
volumeToCheck: "[foobar] kubevols/foo.vmdk",
|
checkRunner: func(volumeMap *VsphereVolumeMap) {
|
||||||
expectInMap: true,
|
volumeToCheck := "[foobar] kubevols/foo.vmdk"
|
||||||
|
_, ok := volumeMap.CheckForVolume(volumeToCheck)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("error checking volume %s, expected true got %v", volumeToCheck, ok)
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "mismatching volume",
|
name: "mismatching volume",
|
||||||
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
||||||
nodeToAdd: convertToK8sType("node1.lan"),
|
nodeToAdd: convertToK8sType("node1.lan"),
|
||||||
volumeToCheck: "[foobar] kubevols/bar.vmdk",
|
checkRunner: func(volumeMap *VsphereVolumeMap) {
|
||||||
expectInMap: false,
|
volumeToCheck := "[foobar] kubevols/bar.vmdk"
|
||||||
|
_, ok := volumeMap.CheckForVolume(volumeToCheck)
|
||||||
|
if ok {
|
||||||
|
t.Errorf("error checking volume %s, expected false got %v", volumeToCheck, ok)
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "should remove unverified devices",
|
name: "should remove unverified devices",
|
||||||
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
||||||
nodeToAdd: convertToK8sType("node1.lan"),
|
nodeToAdd: convertToK8sType("node1.lan"),
|
||||||
volumeToCheck: "[foobar] kubevols/foo.vmdk",
|
checkRunner: func(volumeMap *VsphereVolumeMap) {
|
||||||
runVerification: true,
|
volumeMap.StartDiskVerification()
|
||||||
expectInMap: false,
|
volumeMap.RemoveUnverified()
|
||||||
|
volumeToCheck := "[foobar] kubevols/foo.vmdk"
|
||||||
|
_, ok := volumeMap.CheckForVolume(volumeToCheck)
|
||||||
|
if ok {
|
||||||
|
t.Errorf("error checking volume %s, expected false got %v", volumeToCheck, ok)
|
||||||
|
}
|
||||||
|
node := k8stypes.NodeName("node1.lan")
|
||||||
|
ok = volumeMap.CheckForNode(node)
|
||||||
|
if ok {
|
||||||
|
t.Errorf("unexpected node %s in node map", node)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node check should return false for previously added node",
|
||||||
|
deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"),
|
||||||
|
nodeToAdd: convertToK8sType("node1.lan"),
|
||||||
|
checkRunner: func(volumeMap *VsphereVolumeMap) {
|
||||||
|
volumeMap.StartDiskVerification()
|
||||||
|
node := k8stypes.NodeName("node1.lan")
|
||||||
|
ok := volumeMap.CheckForNode(node)
|
||||||
|
if ok {
|
||||||
|
t.Errorf("unexpected node %s in node map", node)
|
||||||
|
}
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,15 +95,7 @@ func TestVsphereVolumeMap(t *testing.T) {
|
|||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
vMap := NewVsphereVolumeMap()
|
vMap := NewVsphereVolumeMap()
|
||||||
vMap.Add(tc.nodeToAdd, tc.deviceToAdd)
|
vMap.Add(tc.nodeToAdd, tc.deviceToAdd)
|
||||||
|
tc.checkRunner(vMap)
|
||||||
if tc.runVerification {
|
|
||||||
vMap.StartDiskVerification()
|
|
||||||
vMap.RemoveUnverified()
|
|
||||||
}
|
|
||||||
_, ok := vMap.CheckForVolume(tc.volumeToCheck)
|
|
||||||
if ok != tc.expectInMap {
|
|
||||||
t.Errorf("error checking volume %s, expected %v got %v", tc.volumeToCheck, tc.expectInMap, ok)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user