diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 21e074be7bc..8d52f0c906b 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -330,8 +330,8 @@ type Volumes interface { // Check if the volume is already attached to the node with the specified NodeName DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) - // Check if a list of volumes are attached to the node with the specified NodeName - DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error) + // Check if disks specified in argument map are still attached to their respective nodes. + DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) } // InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups @@ -1777,36 +1777,66 @@ func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeN return false, nil } -func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error) { - idToDiskName := make(map[awsVolumeID]KubernetesVolumeID) - attached := make(map[KubernetesVolumeID]bool) - for _, diskName := range diskNames { - volumeID, err := diskName.mapToAWSVolumeID() - if err != nil { - return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) - } - idToDiskName[volumeID] = diskName - attached[diskName] = false +func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) { + attached := make(map[types.NodeName]map[KubernetesVolumeID]bool) + + if len(nodeDisks) == 0 { + return attached, nil } - _, instance, err := c.getFullInstance(nodeName) + + dnsNameSlice := []string{} + for nodeName, diskNames := range nodeDisks { + for _, diskName := range diskNames { + setNodeDisk(attached, diskName, nodeName, false) + } + dnsNameSlice = append(dnsNameSlice, mapNodeNameToPrivateDNSName(nodeName)) + } + + awsInstances, err := c.getInstancesByNodeNames(dnsNameSlice) if err != nil { - if err == cloudprovider.InstanceNotFound { + // When there is an error fetching instance information + // it is safer to return nil and let volume information not be touched. + return nil, err + } + + if len(awsInstances) == 0 { + glog.V(2).Infof("DisksAreAttached will assume no disks are attached to any node on AWS cluster.") + return attached, nil + } + + awsInstanceMap := make(map[types.NodeName]*ec2.Instance) + for _, awsInstance := range awsInstances { + awsInstanceMap[mapInstanceToNodeName(awsInstance)] = awsInstance + } + + // Note that we check that the volume is attached to the correct node, not that it is attached to _a_ node + for nodeName, diskNames := range nodeDisks { + awsInstance := awsInstanceMap[nodeName] + if awsInstance == nil { // If instance no longer exists, safe to assume volume is not attached. glog.Warningf( "Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.", nodeName, diskNames) - return attached, nil + continue } - return attached, err - } - for _, blockDevice := range instance.BlockDeviceMappings { - volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) - diskName, found := idToDiskName[volumeID] - if found { - // Disk is still attached to node - attached[diskName] = true + idToDiskName := make(map[awsVolumeID]KubernetesVolumeID) + for _, diskName := range diskNames { + volumeID, err := diskName.mapToAWSVolumeID() + if err != nil { + return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) + } + idToDiskName[volumeID] = diskName + } + + for _, blockDevice := range awsInstance.BlockDeviceMappings { + volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) + diskName, found := idToDiskName[volumeID] + if found { + // Disk is still attached to node + setNodeDisk(attached, diskName, nodeName, true) + } } } @@ -3146,7 +3176,24 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins return c.lastInstancesByNodeNames, nil } } - names := aws.StringSlice(nodeNames.List()) + instances, err := c.getInstancesByNodeNames(nodeNames.List()) + + if err != nil { + return nil, err + } + + if len(instances) == 0 { + return nil, nil + } + + glog.V(2).Infof("Caching instances for %v", nodeNames) + c.lastNodeNames = nodeNames + c.lastInstancesByNodeNames = instances + return instances, nil +} + +func (c *Cloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, error) { + names := aws.StringSlice(nodeNames) nodeNameFilter := &ec2.Filter{ Name: aws.String("private-dns-name"), @@ -3168,10 +3215,6 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins glog.V(3).Infof("Failed to find any instances %v", nodeNames) return nil, nil } - - glog.V(2).Infof("Caching instances for %v", nodeNames) - c.lastNodeNames = nodeNames - c.lastInstancesByNodeNames = instances return instances, nil } @@ -3251,3 +3294,18 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins awsInstance := newAWSInstance(c.ec2, instance) return awsInstance, instance, err } + +func setNodeDisk( + nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool, + volumeID KubernetesVolumeID, + nodeName types.NodeName, + check bool) { + + volumeMap := nodeDiskMap[nodeName] + + if volumeMap == nil { + volumeMap = make(map[KubernetesVolumeID]bool) + nodeDiskMap[nodeName] = volumeMap + } + volumeMap[volumeID] = check +} diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 1856eec9164..505e11e0718 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -121,12 +121,7 @@ func (rc *reconciler) updateSyncTime() { func (rc *reconciler) syncStates() { volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode() - for nodeName, volumes := range volumesPerNode { - err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld) - if err != nil { - glog.Errorf("Error in syncing states for volumes: %v", err) - } - } + rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } func (rc *reconciler) reconcile() { diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index d3725074d2d..24e7f2181f6 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -1118,6 +1118,10 @@ func (plugin *mockVolumePlugin) SupportsMountOption() bool { return false } +func (plugin *mockVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) { return nil, nil } diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index 84bcc1722e4..9533cdeab53 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -78,37 +78,67 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName } func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { - volumesAttachedCheck := make(map[*volume.Spec]bool) - volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec) - volumeIDList := []aws.KubernetesVolumeID{} - for _, spec := range specs { - volumeSource, _, err := getVolumeSource(spec) - if err != nil { - glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) - continue - } - name := aws.KubernetesVolumeID(volumeSource.VolumeID) - volumeIDList = append(volumeIDList, name) - volumesAttachedCheck[spec] = true - volumeSpecMap[name] = spec + glog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for AWS", nodeName) + volumeNodeMap := map[types.NodeName][]*volume.Spec{ + nodeName: specs, } - attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName) + nodeVolumesResult := make(map[*volume.Spec]bool) + nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap) if err != nil { - // Log error and continue with attach - glog.Errorf( - "Error checking if volumes (%v) is already attached to current node (%q). err=%v", - volumeIDList, nodeName, err) + glog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err) + return nodeVolumesResult, err + } + + if result, ok := nodesVerificationMap[nodeName]; ok { + return result, nil + } + return nodeVolumesResult, nil +} + +func (attacher *awsElasticBlockStoreAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool) + diskNamesByNode := make(map[types.NodeName][]aws.KubernetesVolumeID) + volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec) + + for nodeName, volumeSpecs := range volumesByNode { + for _, volumeSpec := range volumeSpecs { + volumeSource, _, err := getVolumeSource(volumeSpec) + + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err) + continue + } + + name := aws.KubernetesVolumeID(volumeSource.VolumeID) + diskNamesByNode[nodeName] = append(diskNamesByNode[nodeName], name) + + nodeDisk, nodeDiskExists := volumesAttachedCheck[nodeName] + + if !nodeDiskExists { + nodeDisk = make(map[*volume.Spec]bool) + } + nodeDisk[volumeSpec] = true + volumeSpecMap[name] = volumeSpec + volumesAttachedCheck[nodeName] = nodeDisk + } + } + attachedResult, err := attacher.awsVolumes.DisksAreAttached(diskNamesByNode) + + if err != nil { + glog.Errorf("Error checking if volumes are attached to nodes err = %v", err) return volumesAttachedCheck, err } - for volumeID, attached := range attachedResult { - if !attached { - spec := volumeSpecMap[volumeID] - volumesAttachedCheck[spec] = false - glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + for nodeName, nodeDisks := range attachedResult { + for diskName, attached := range nodeDisks { + if !attached { + spec := volumeSpecMap[diskName] + setNodeDisk(volumesAttachedCheck, spec, nodeName, false) + } } } + return volumesAttachedCheck, nil } @@ -249,3 +279,17 @@ func (detacher *awsElasticBlockStoreDetacher) Detach(deviceMountPath string, nod func (detacher *awsElasticBlockStoreDetacher) UnmountDevice(deviceMountPath string) error { return volumeutil.UnmountPath(deviceMountPath, detacher.mounter) } + +func setNodeDisk( + nodeDiskMap map[types.NodeName]map[*volume.Spec]bool, + volumeSpec *volume.Spec, + nodeName types.NodeName, + check bool) { + + volumeMap := nodeDiskMap[nodeName] + if volumeMap == nil { + volumeMap = make(map[*volume.Spec]bool) + nodeDiskMap[nodeName] = volumeMap + } + volumeMap[volumeSpec] = check +} diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index c36d6af9866..9ba08e5eb70 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -321,7 +321,7 @@ func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeNa return expected.isAttached, expected.ret } -func (testcase *testcase) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) { +func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { return nil, errors.New("Not implemented") } diff --git a/pkg/volume/aws_ebs/aws_ebs.go b/pkg/volume/aws_ebs/aws_ebs.go index 736b400deec..af80b7273e8 100644 --- a/pkg/volume/aws_ebs/aws_ebs.go +++ b/pkg/volume/aws_ebs/aws_ebs.go @@ -91,6 +91,10 @@ func (plugin *awsElasticBlockStorePlugin) SupportsMountOption() bool { return true } +func (plugin *awsElasticBlockStorePlugin) SupportsBulkVolumeVerification() bool { + return true +} + func (plugin *awsElasticBlockStorePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/azure_dd/azure_dd.go b/pkg/volume/azure_dd/azure_dd.go index 12595e5df89..edffe4fd754 100644 --- a/pkg/volume/azure_dd/azure_dd.go +++ b/pkg/volume/azure_dd/azure_dd.go @@ -107,6 +107,10 @@ func (plugin *azureDataDiskPlugin) SupportsMountOption() bool { return true } +func (plugin *azureDataDiskPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *azureDataDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/azure_file/azure_file.go b/pkg/volume/azure_file/azure_file.go index 2dd6625f1ef..b46a989929e 100644 --- a/pkg/volume/azure_file/azure_file.go +++ b/pkg/volume/azure_file/azure_file.go @@ -83,6 +83,10 @@ func (plugin *azureFilePlugin) SupportsMountOption() bool { return true } +func (plugin *azureFilePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *azureFilePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/cephfs/cephfs.go b/pkg/volume/cephfs/cephfs.go index 53f90a074d7..043124cec88 100644 --- a/pkg/volume/cephfs/cephfs.go +++ b/pkg/volume/cephfs/cephfs.go @@ -76,6 +76,10 @@ func (plugin *cephfsPlugin) SupportsMountOption() bool { return true } +func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index 8e3067496d8..047e735568a 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -101,6 +101,10 @@ func (plugin *cinderPlugin) RequiresRemount() bool { func (plugin *cinderPlugin) SupportsMountOption() bool { return true + +} +func (plugin *cinderPlugin) SupportsBulkVolumeVerification() bool { + return false } func (plugin *cinderPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { diff --git a/pkg/volume/configmap/configmap.go b/pkg/volume/configmap/configmap.go index e10419976aa..6d8d484dea6 100644 --- a/pkg/volume/configmap/configmap.go +++ b/pkg/volume/configmap/configmap.go @@ -80,6 +80,10 @@ func (plugin *configMapPlugin) SupportsMountOption() bool { return false } +func (plugin *configMapPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *configMapPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return &configMapVolumeMounter{ configMapVolume: &configMapVolume{spec.Name(), pod.UID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}, diff --git a/pkg/volume/downwardapi/downwardapi.go b/pkg/volume/downwardapi/downwardapi.go index c09c94417ff..409aa56ac53 100644 --- a/pkg/volume/downwardapi/downwardapi.go +++ b/pkg/volume/downwardapi/downwardapi.go @@ -86,6 +86,10 @@ func (plugin *downwardAPIPlugin) SupportsMountOption() bool { return false } +func (plugin *downwardAPIPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *downwardAPIPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { v := &downwardAPIVolume{ volName: spec.Name(), diff --git a/pkg/volume/empty_dir/empty_dir.go b/pkg/volume/empty_dir/empty_dir.go index 70f95c0c67e..d953f2ac886 100644 --- a/pkg/volume/empty_dir/empty_dir.go +++ b/pkg/volume/empty_dir/empty_dir.go @@ -94,6 +94,10 @@ func (plugin *emptyDirPlugin) SupportsMountOption() bool { return false } +func (plugin *emptyDirPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *emptyDirPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(), &realMountDetector{plugin.host.GetMounter()}, opts) } diff --git a/pkg/volume/fc/fc.go b/pkg/volume/fc/fc.go index 8276f2f39c0..2d98773a99e 100644 --- a/pkg/volume/fc/fc.go +++ b/pkg/volume/fc/fc.go @@ -82,6 +82,10 @@ func (plugin *fcPlugin) SupportsMountOption() bool { return false } +func (plugin *fcPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *fcPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/flexvolume/plugin.go b/pkg/volume/flexvolume/plugin.go index c35ab278155..b810b8cd998 100644 --- a/pkg/volume/flexvolume/plugin.go +++ b/pkg/volume/flexvolume/plugin.go @@ -177,6 +177,10 @@ func (plugin *flexVolumePlugin) unsupported(commands ...string) { plugin.unsupportedCommands = append(plugin.unsupportedCommands, commands...) } +func (plugin *flexVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + // Returns true iff the given command is known to be unsupported. func (plugin *flexVolumePlugin) isUnsupported(command string) bool { plugin.Lock() diff --git a/pkg/volume/flocker/flocker.go b/pkg/volume/flocker/flocker.go index b8509f4915b..8f44fae6e4e 100644 --- a/pkg/volume/flocker/flocker.go +++ b/pkg/volume/flocker/flocker.go @@ -116,6 +116,10 @@ func (p *flockerPlugin) SupportsMountOption() bool { return false } +func (plugin *flockerPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/gce_pd/gce_pd.go b/pkg/volume/gce_pd/gce_pd.go index 598928ba5fd..cc8915d31a0 100644 --- a/pkg/volume/gce_pd/gce_pd.go +++ b/pkg/volume/gce_pd/gce_pd.go @@ -86,6 +86,10 @@ func (plugin *gcePersistentDiskPlugin) SupportsMountOption() bool { return true } +func (plugin *gcePersistentDiskPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *gcePersistentDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/git_repo/git_repo.go b/pkg/volume/git_repo/git_repo.go index 59f6fdfc2a0..18b61d9c3e4 100644 --- a/pkg/volume/git_repo/git_repo.go +++ b/pkg/volume/git_repo/git_repo.go @@ -85,6 +85,10 @@ func (plugin *gitRepoPlugin) SupportsMountOption() bool { return false } +func (plugin *gitRepoPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *gitRepoPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return &gitRepoVolumeMounter{ gitRepoVolume: &gitRepoVolume{ diff --git a/pkg/volume/glusterfs/glusterfs.go b/pkg/volume/glusterfs/glusterfs.go index 78c9c09dea2..9ec54bffacd 100644 --- a/pkg/volume/glusterfs/glusterfs.go +++ b/pkg/volume/glusterfs/glusterfs.go @@ -119,6 +119,10 @@ func (plugin *glusterfsPlugin) SupportsMountOption() bool { return true } +func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 93a590ccb29..9c49235c64a 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -87,6 +87,10 @@ func (plugin *hostPathPlugin) SupportsMountOption() bool { return false } +func (plugin *hostPathPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *hostPathPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/iscsi/iscsi.go b/pkg/volume/iscsi/iscsi.go index 9d1f3f68e93..6ccf90cc711 100644 --- a/pkg/volume/iscsi/iscsi.go +++ b/pkg/volume/iscsi/iscsi.go @@ -86,6 +86,10 @@ func (plugin *iscsiPlugin) SupportsMountOption() bool { return true } +func (plugin *iscsiPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *iscsiPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 5e57c093153..1e9ed12cd7b 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -92,6 +92,10 @@ func (plugin *nfsPlugin) SupportsMountOption() bool { return true } +func (plugin *nfsPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *nfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/photon_pd/photon_pd.go b/pkg/volume/photon_pd/photon_pd.go index 424e5dbc82d..606c57b1a93 100644 --- a/pkg/volume/photon_pd/photon_pd.go +++ b/pkg/volume/photon_pd/photon_pd.go @@ -83,6 +83,10 @@ func (plugin *photonPersistentDiskPlugin) SupportsMountOption() bool { return true } +func (plugin *photonPersistentDiskPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *photonPersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { return plugin.newMounterInternal(spec, pod.UID, &PhotonDiskUtil{}, plugin.host.GetMounter()) } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 9c96ff45be5..b78c76d2f9c 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -112,6 +112,11 @@ type VolumePlugin interface { // Specifying mount options in a volume plugin that doesn't support // user specified mount options will result in error creating persistent volumes SupportsMountOption() bool + + // SupportsBulkVolumeVerification checks if volume plugin type is capable + // of enabling bulk polling of all nodes. This can speed up verification of + // attached volumes by quite a bit, but underlying pluging must support it. + SupportsBulkVolumeVerification() bool } // PersistentVolumePlugin is an extended interface of VolumePlugin and is used diff --git a/pkg/volume/plugins_test.go b/pkg/volume/plugins_test.go index 75e38330ead..da0aa133546 100644 --- a/pkg/volume/plugins_test.go +++ b/pkg/volume/plugins_test.go @@ -81,6 +81,10 @@ func (plugin *testPlugins) SupportsMountOption() bool { return false } +func (plugin *testPlugins) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *testPlugins) NewMounter(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (Mounter, error) { return nil, nil } diff --git a/pkg/volume/portworx/portworx.go b/pkg/volume/portworx/portworx.go index b89bfd3e051..96302e636a2 100644 --- a/pkg/volume/portworx/portworx.go +++ b/pkg/volume/portworx/portworx.go @@ -179,6 +179,10 @@ func (plugin *portworxVolumePlugin) SupportsMountOption() bool { return false } +func (plugin *portworxVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func getVolumeSource( spec *volume.Spec) (*v1.PortworxVolumeSource, bool, error) { if spec.Volume != nil && spec.Volume.PortworxVolume != nil { diff --git a/pkg/volume/projected/projected.go b/pkg/volume/projected/projected.go index ecfa28843dc..5ff87d78ec0 100644 --- a/pkg/volume/projected/projected.go +++ b/pkg/volume/projected/projected.go @@ -96,6 +96,10 @@ func (plugin *projectedPlugin) SupportsMountOption() bool { return false } +func (plugin *projectedPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *projectedPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return &projectedVolumeMounter{ projectedVolume: &projectedVolume{ diff --git a/pkg/volume/quobyte/quobyte.go b/pkg/volume/quobyte/quobyte.go index 5ade721f5fa..68f647fa398 100644 --- a/pkg/volume/quobyte/quobyte.go +++ b/pkg/volume/quobyte/quobyte.go @@ -122,6 +122,10 @@ func (plugin *quobytePlugin) SupportsMountOption() bool { return true } +func (plugin *quobytePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *quobytePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/rbd/rbd.go b/pkg/volume/rbd/rbd.go index 6a4aa559bbd..c32d09172ab 100644 --- a/pkg/volume/rbd/rbd.go +++ b/pkg/volume/rbd/rbd.go @@ -90,6 +90,10 @@ func (plugin *rbdPlugin) SupportsMountOption() bool { return true } +func (plugin *rbdPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, diff --git a/pkg/volume/secret/secret.go b/pkg/volume/secret/secret.go index 0347a362255..9f73bc80ae0 100644 --- a/pkg/volume/secret/secret.go +++ b/pkg/volume/secret/secret.go @@ -89,6 +89,10 @@ func (plugin *secretPlugin) SupportsMountOption() bool { return false } +func (plugin *secretPlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *secretPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { return &secretVolumeMounter{ secretVolume: &secretVolume{ diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 22f5da6bcbb..79d1c07c92a 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -207,6 +207,10 @@ func (plugin *FakeVolumePlugin) SupportsMountOption() bool { return true } +func (plugin *FakeVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) { plugin.Lock() defer plugin.Unlock() diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 6bed7977c83..c1e67a0cdd8 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -40,6 +40,7 @@ go_test( deps = [ "//pkg/api/v1:go_default_library", "//pkg/util/mount:go_default_library", + "//pkg/volume:go_default_library", "//pkg/volume/util/types:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index b301365df72..8e28405786b 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/mount" @@ -59,12 +61,16 @@ type OperationExecutor interface { // It then updates the actual state of the world to reflect that. AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error - // VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node. + // VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node. // If any volume is not attached right now, it will update the actual state of the world to reflect that. // Note that this operation could be operated concurrently with other attach/detach operations. // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached // even if it is attached. But reconciler can correct this in a short period of time. - VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + + // VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node + // If any volume is not attached right now, it will update actual state of world to reflect that. + VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) // DetachVolume detaches the volume from the node specified in // volumeToDetach, and updates the actual state of the world to reflect @@ -387,8 +393,84 @@ func (oe *operationExecutor) DetachVolume( return oe.pendingOperations.Run( volumeToDetach.VolumeName, "" /* podName */, detachFunc) } - func (oe *operationExecutor) VerifyVolumesAreAttached( + attachedVolumes map[types.NodeName][]AttachedVolume, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) { + + // A map of plugin names and nodes on which they exist with volumes they manage + bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec) + volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName) + + for node, nodeAttachedVolumes := range attachedVolumes { + for _, volumeAttached := range nodeAttachedVolumes { + volumePlugin, err := + oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) + + if err != nil || volumePlugin == nil { + glog.Errorf( + "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", + volumeAttached.VolumeName, + volumeAttached.VolumeSpec.Name(), + volumeAttached.NodeName, + err) + continue + } + + pluginName := volumePlugin.GetPluginName() + + if volumePlugin.SupportsBulkVolumeVerification() { + pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName] + + if !pluginNodesExist { + pluginNodes = make(map[types.NodeName][]*volume.Spec) + } + + volumeSpecList, nodeExists := pluginNodes[node] + if !nodeExists { + volumeSpecList = []*volume.Spec{} + } + volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) + pluginNodes[node] = volumeSpecList + + bulkVerifyPluginsByNode[pluginName] = pluginNodes + volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName] + + if !mapExists { + volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName) + } + volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName + volumeSpecMapByPlugin[pluginName] = volumeSpecMap + continue + } + + // If node doesn't support Bulk volume polling it is best to poll individually + nodeError := oe.VerifyVolumesAreAttachedPerNode(nodeAttachedVolumes, node, actualStateOfWorld) + if nodeError != nil { + glog.Errorf("BulkVerifyVolumes.VerifyVolumesAreAttached verifying volumes on node %q with %v", node, nodeError) + } + break + } + } + + for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode { + bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( + pluginNodeVolumes, + pluginName, + volumeSpecMapByPlugin[pluginName], + actualStateOfWorld) + if err != nil { + glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err) + } + // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin + uniquePluginName := v1.UniqueVolumeName(pluginName) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc) + if err != nil { + glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) + } + } +} + +func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index e9932873667..87400c8766f 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -197,7 +198,7 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { // Act for i := 0; i < numVolumesToVerifyAttached; i++ { - oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */) + oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */) } // Assert @@ -281,6 +282,21 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v }, nil } +func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( + pluginNodeVolumes map[types.NodeName][]*volume.Spec, + pluginNane string, + volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, + actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, nil +} + +func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { + return nil +} + func getTestPodWithSecret(podName, secretName string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index b85f83b6a4e..f1aff52c818 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -88,6 +88,14 @@ type OperationGenerator interface { // Generates the function needed to check if the attach_detach controller has attached the volume plugin GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + + // GetVolumePluginMgr returns volume plugin manager + GetVolumePluginMgr() *volume.VolumePluginMgr + + GenerateBulkVolumeVerifyFunc( + map[types.NodeName][]*volume.Spec, + string, + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error) } func (og *operationGenerator) GenerateVolumesAreAttachedFunc( @@ -167,6 +175,71 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( }, nil } +func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( + pluginNodeVolumes map[types.NodeName][]*volume.Spec, + pluginName string, + volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + + return func() error { + attachableVolumePlugin, err := + og.volumePluginMgr.FindAttachablePluginByName(pluginName) + if err != nil || attachableVolumePlugin == nil { + glog.Errorf( + "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", + pluginName, + err) + return nil + } + + volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() + + if newAttacherErr != nil { + glog.Errorf( + "BulkVerifyVolumes failed for getting plugin %q with: %v", + attachableVolumePlugin, + newAttacherErr) + return nil + } + bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) + + if !ok { + glog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) + return nil + } + + attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) + if bulkAttachErr != nil { + glog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) + return nil + } + + for nodeName, volumeSpecs := range pluginNodeVolumes { + for _, volumeSpec := range volumeSpecs { + nodeVolumeSpecs, nodeChecked := attached[nodeName] + + if !nodeChecked { + glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached", + nodeName, + volumeSpec.Name()) + continue + } + + check := nodeVolumeSpecs[volumeSpec] + + if !check { + glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q", + nodeName, + volumeSpec.Name()) + actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName) + } + } + } + + return nil + }, nil +} + func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { @@ -233,6 +306,10 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( }, nil } +func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { + return og.volumePluginMgr +} + func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach AttachedVolume, verifySafeToDetach bool, diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index e0499fdd660..8ec9affd374 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -187,6 +187,14 @@ type Attacher interface { MountDevice(spec *Spec, devicePath string, deviceMountPath string) error } +type BulkVolumeVerifier interface { + // BulkVerifyVolumes checks whether the list of volumes still attached to the + // the clusters in the node. It returns a map which maps from the volume spec to the checking result. + // If an error occurs during check - error should be returned and volume on nodes + // should be assumed as still attached. + BulkVerifyVolumes(volumesByNode map[types.NodeName][]*Spec) (map[types.NodeName]map[*Spec]bool, error) +} + // Detacher can detach a volume from a node. type Detacher interface { // Detach the given device from the node with the given Name. diff --git a/pkg/volume/vsphere_volume/vsphere_volume.go b/pkg/volume/vsphere_volume/vsphere_volume.go index d25edac7e83..ac72babe9ec 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume.go +++ b/pkg/volume/vsphere_volume/vsphere_volume.go @@ -84,6 +84,10 @@ func (plugin *vsphereVolumePlugin) SupportsMountOption() bool { return true } +func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool { + return false +} + func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { return plugin.newMounterInternal(spec, pod.UID, &VsphereDiskUtil{}, plugin.host.GetMounter()) } diff --git a/plugin/pkg/admission/persistentvolume/label/admission_test.go b/plugin/pkg/admission/persistentvolume/label/admission_test.go index 632ded64fe8..2094edd00ff 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/persistentvolume/label/admission_test.go @@ -63,7 +63,7 @@ func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName return false, fmt.Errorf("not implemented") } -func (c *mockVolumes) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) { +func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { return nil, fmt.Errorf("not implemented") }