diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go index 42c87c8f9b3..04c49a1aba3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go @@ -311,6 +311,17 @@ func (nm *NodeManager) GetNodeDetails() ([]NodeDetails, error) { return nodeDetails, nil } +// GetNodeNames returns list of nodes that are known to vsphere cloudprovider. +// These are typically nodes that make up k8s cluster. +func (nm *NodeManager) GetNodeNames() []k8stypes.NodeName { + nodes := nm.getNodes() + var nodeNameList []k8stypes.NodeName + for _, node := range nodes { + nodeNameList = append(nodeNameList, k8stypes.NodeName(node.Name)) + } + return nodeNameList +} + func (nm *NodeManager) refreshNodes() (errList []error) { for nodeName := range nm.getNodes() { nodeInfo, err := nm.getRefreshedNodeInfo(convertToK8sType(nodeName)) diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go index f1b81844398..83337adc7c2 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go @@ -1221,9 +1221,15 @@ func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) } } + klog.V(4).Infof("DisksAreAttached successfully executed. result: %+v", attached) + // There could be nodes in cluster which do not have any pods with vsphere volumes running on them + // such nodes won't be part of nodeVolumes map because attach-detach controller does not keep track + // such nodes. But such nodes may still have dangling volumes on them and hence we need to scan all the + // remaining nodes which weren't scanned by code previously. + vs.BuildMissingVolumeNodeMap(ctx) // any volume which we could not verify will be removed from the map. vs.vsphereVolumeMap.RemoveUnverified() - klog.V(4).Infof("DisksAreAttach successfully executed. result: %+v", attached) + klog.V(4).Infof("current node volume map is: %+v", vs.vsphereVolumeMap.volumeNodeMap) return disksAttached, nil } requestTime := time.Now() diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_util.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_util.go index b297113771d..c0d474ab557 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_util.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_util.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/vmware/govmomi/find" @@ -608,6 +609,123 @@ func (vs *VSphere) checkDiskAttached(ctx context.Context, nodes []k8stypes.NodeN return nodesToRetry, nil } +// BuildMissingVolumeNodeMap builds a map of volumes and nodes which are not known to attach detach controller. +// There could be nodes in cluster which do not have any pods with vsphere volumes running on them +// such nodes won't be part of disk verification check because attach-detach controller does not keep track +// such nodes. But such nodes may still have dangling volumes on them and hence we need to scan all the +// remaining nodes which weren't scanned by code previously. +func (vs *VSphere) BuildMissingVolumeNodeMap(ctx context.Context) { + nodeNames := vs.nodeManager.GetNodeNames() + // Segregate nodes according to VC-DC + dcNodes := make(map[string][]k8stypes.NodeName) + + for _, nodeName := range nodeNames { + // if given node is not in node volume map + if !vs.vsphereVolumeMap.CheckForNode(nodeName) { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + klog.V(4).Infof("Failed to get node info: %+v. err: %+v", nodeInfo.vm, err) + continue + } + vcDC := nodeInfo.vcServer + nodeInfo.dataCenter.String() + dcNodes[vcDC] = append(dcNodes[vcDC], nodeName) + } + } + + var wg sync.WaitGroup + + for _, nodeNames := range dcNodes { + // Start go routines per VC-DC to check disks are attached + wg.Add(1) + go func(nodes []k8stypes.NodeName) { + err := vs.checkNodeDisks(ctx, nodeNames) + if err != nil { + klog.Errorf("Failed to check disk attached for nodes: %+v. err: %+v", nodes, err) + } + wg.Done() + }(nodeNames) + } + wg.Wait() +} + +func (vs *VSphere) checkNodeDisks(ctx context.Context, nodeNames []k8stypes.NodeName) error { + var vmList []*vclib.VirtualMachine + var nodeInfo NodeInfo + var err error + + for _, nodeName := range nodeNames { + nodeInfo, err = vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + return err + } + vmList = append(vmList, nodeInfo.vm) + } + + // Making sure session is valid + _, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx) + if err != nil { + return err + } + + // If any of the nodes are not present property collector query will fail for entire operation + vmMoList, err := nodeInfo.dataCenter.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name", "config.uuid"}) + if err != nil { + if vclib.IsManagedObjectNotFoundError(err) { + klog.V(4).Infof("checkNodeDisks: ManagedObjectNotFound for property collector query for nodes: %+v vms: %+v", nodeNames, vmList) + // Property Collector Query failed + // VerifyVolumePaths per VM + for _, nodeName := range nodeNames { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + return err + } + devices, err := nodeInfo.vm.VirtualMachine.Device(ctx) + if err != nil { + if vclib.IsManagedObjectNotFoundError(err) { + klog.V(4).Infof("checkNodeDisks: ManagedObjectNotFound for Kubernetes node: %s with vSphere Virtual Machine reference: %v", nodeName, nodeInfo.vm) + continue + } + return err + } + klog.V(4).Infof("Verifying Volume Paths by devices for node %s and VM %s", nodeName, nodeInfo.vm) + vs.vsphereVolumeMap.Add(nodeName, devices) + } + return nil + } + return err + } + + vmMoMap := make(map[string]mo.VirtualMachine) + for _, vmMo := range vmMoList { + if vmMo.Config == nil { + klog.Errorf("Config is not available for VM: %q", vmMo.Name) + continue + } + klog.V(9).Infof("vmMoMap vmname: %q vmuuid: %s", vmMo.Name, strings.ToLower(vmMo.Config.Uuid)) + vmMoMap[strings.ToLower(vmMo.Config.Uuid)] = vmMo + } + + klog.V(9).Infof("vmMoMap: +%v", vmMoMap) + + for _, nodeName := range nodeNames { + node, err := vs.nodeManager.GetNode(nodeName) + if err != nil { + return err + } + nodeUUID, err := GetNodeUUID(&node) + if err != nil { + klog.Errorf("Node Discovery failed to get node uuid for node %s with error: %v", node.Name, err) + return err + } + nodeUUID = strings.ToLower(nodeUUID) + klog.V(9).Infof("Verifying volume for node %s with nodeuuid %q: %v", nodeName, nodeUUID, vmMoMap) + vmMo := vmMoMap[nodeUUID] + vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device) + vs.vsphereVolumeMap.Add(nodeName, vmDevices) + } + return nil +} + func (vs *VSphere) GetNodeNameFromProviderID(providerID string) (string, error) { var nodeName string nodes, err := vs.nodeManager.GetNodeDetails() diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map.go index 737e5a0d248..6895bf90280 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map.go @@ -36,12 +36,14 @@ type nodeVolumeStatus struct { // VsphereVolumeMap stores last known state of node and volume mapping type VsphereVolumeMap struct { volumeNodeMap map[volumePath]nodeVolumeStatus + nodeMap map[k8stypes.NodeName]bool lock sync.RWMutex } func NewVsphereVolumeMap() *VsphereVolumeMap { return &VsphereVolumeMap{ volumeNodeMap: map[volumePath]nodeVolumeStatus{}, + nodeMap: map[k8stypes.NodeName]bool{}, } } @@ -54,6 +56,9 @@ func (vsphereVolume *VsphereVolumeMap) StartDiskVerification() { v.verified = false vsphereVolume.volumeNodeMap[k] = v } + // reset nodeMap to empty so that any node we could not verify via usual verification process + // can still be verified. + vsphereVolume.nodeMap = map[k8stypes.NodeName]bool{} } // CheckForVolume verifies if disk is attached to some node in the cluster. @@ -69,6 +74,16 @@ func (vsphereVolume *VsphereVolumeMap) CheckForVolume(path string) (k8stypes.Nod return "", false } +// CheckForNode returns true if given node has already been processed by volume +// verification mechanism. This is used to skip verifying attached disks on nodes +// which were previously verified. +func (vsphereVolume *VsphereVolumeMap) CheckForNode(nodeName k8stypes.NodeName) bool { + vsphereVolume.lock.RLock() + defer vsphereVolume.lock.RUnlock() + _, ok := vsphereVolume.nodeMap[nodeName] + return ok +} + // Add all devices found on a node to the device map func (vsphereVolume *VsphereVolumeMap) Add(node k8stypes.NodeName, vmDevices object.VirtualDeviceList) { vsphereVolume.lock.Lock() @@ -79,6 +94,7 @@ func (vsphereVolume *VsphereVolumeMap) Add(node k8stypes.NodeName, vmDevices obj if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok { filename := volumePath(backing.FileName) vsphereVolume.volumeNodeMap[filename] = nodeVolumeStatus{node, true} + vsphereVolume.nodeMap[node] = true } } } @@ -91,6 +107,7 @@ func (vsphereVolume *VsphereVolumeMap) RemoveUnverified() { for k, v := range vsphereVolume.volumeNodeMap { if !v.verified { delete(vsphereVolume.volumeNodeMap, k) + delete(vsphereVolume.nodeMap, v.nodeName) } } } diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map_test.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map_test.go index ade8be5f03a..113e6b1d149 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere_volume_map_test.go @@ -28,34 +28,66 @@ import ( func TestVsphereVolumeMap(t *testing.T) { tests := []struct { - name string - deviceToAdd object.VirtualDeviceList - nodeToAdd k8stypes.NodeName - volumeToCheck string - runVerification bool - expectInMap bool + name string + deviceToAdd object.VirtualDeviceList + nodeToAdd k8stypes.NodeName + checkRunner func(volumeMap *VsphereVolumeMap) }{ { - name: "adding new volume", - deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), - nodeToAdd: convertToK8sType("node1.lan"), - volumeToCheck: "[foobar] kubevols/foo.vmdk", - expectInMap: true, + name: "adding new volume", + deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), + nodeToAdd: convertToK8sType("node1.lan"), + checkRunner: func(volumeMap *VsphereVolumeMap) { + volumeToCheck := "[foobar] kubevols/foo.vmdk" + _, ok := volumeMap.CheckForVolume(volumeToCheck) + if !ok { + t.Errorf("error checking volume %s, expected true got %v", volumeToCheck, ok) + } + }, }, { - name: "mismatching volume", - deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), - nodeToAdd: convertToK8sType("node1.lan"), - volumeToCheck: "[foobar] kubevols/bar.vmdk", - expectInMap: false, + name: "mismatching volume", + deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), + nodeToAdd: convertToK8sType("node1.lan"), + checkRunner: func(volumeMap *VsphereVolumeMap) { + volumeToCheck := "[foobar] kubevols/bar.vmdk" + _, ok := volumeMap.CheckForVolume(volumeToCheck) + if ok { + t.Errorf("error checking volume %s, expected false got %v", volumeToCheck, ok) + } + }, }, { - name: "should remove unverified devices", - deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), - nodeToAdd: convertToK8sType("node1.lan"), - volumeToCheck: "[foobar] kubevols/foo.vmdk", - runVerification: true, - expectInMap: false, + name: "should remove unverified devices", + deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), + nodeToAdd: convertToK8sType("node1.lan"), + checkRunner: func(volumeMap *VsphereVolumeMap) { + volumeMap.StartDiskVerification() + volumeMap.RemoveUnverified() + volumeToCheck := "[foobar] kubevols/foo.vmdk" + _, ok := volumeMap.CheckForVolume(volumeToCheck) + if ok { + t.Errorf("error checking volume %s, expected false got %v", volumeToCheck, ok) + } + node := k8stypes.NodeName("node1.lan") + ok = volumeMap.CheckForNode(node) + if ok { + t.Errorf("unexpected node %s in node map", node) + } + }, + }, + { + name: "node check should return false for previously added node", + deviceToAdd: getVirtualDeviceList("[foobar] kubevols/foo.vmdk"), + nodeToAdd: convertToK8sType("node1.lan"), + checkRunner: func(volumeMap *VsphereVolumeMap) { + volumeMap.StartDiskVerification() + node := k8stypes.NodeName("node1.lan") + ok := volumeMap.CheckForNode(node) + if ok { + t.Errorf("unexpected node %s in node map", node) + } + }, }, } @@ -63,15 +95,7 @@ func TestVsphereVolumeMap(t *testing.T) { t.Run(tc.name, func(t *testing.T) { vMap := NewVsphereVolumeMap() vMap.Add(tc.nodeToAdd, tc.deviceToAdd) - - if tc.runVerification { - vMap.StartDiskVerification() - vMap.RemoveUnverified() - } - _, ok := vMap.CheckForVolume(tc.volumeToCheck) - if ok != tc.expectInMap { - t.Errorf("error checking volume %s, expected %v got %v", tc.volumeToCheck, tc.expectInMap, ok) - } + tc.checkRunner(vMap) }) } }