Merge pull request #41306 from gnufied/implement-interface-bulk-volume-poll

Automatic merge from submit-queue (batch tested with PRs 41306, 42187, 41666, 42275, 42266)

Implement bulk polling of volumes

This implements Bulk volume polling using ideas presented by
justin in https://github.com/kubernetes/kubernetes/pull/39564

But it changes the implementation to use an interface
and doesn't affect other implementations.

cc @justinsb
This commit is contained in:
Kubernetes Submit Queue 2017-03-03 10:54:38 -08:00 committed by GitHub
commit e9bbfb81c1
38 changed files with 457 additions and 63 deletions

View File

@ -330,8 +330,8 @@ type Volumes interface {
// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error)
// Check if disks specified in argument map are still attached to their respective nodes.
DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@ -1777,36 +1777,66 @@ func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeN
return false, nil
}
func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName types.NodeName) (map[KubernetesVolumeID]bool, error) {
idToDiskName := make(map[awsVolumeID]KubernetesVolumeID)
attached := make(map[KubernetesVolumeID]bool)
for _, diskName := range diskNames {
volumeID, err := diskName.mapToAWSVolumeID()
if err != nil {
return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
idToDiskName[volumeID] = diskName
attached[diskName] = false
func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) {
attached := make(map[types.NodeName]map[KubernetesVolumeID]bool)
if len(nodeDisks) == 0 {
return attached, nil
}
_, instance, err := c.getFullInstance(nodeName)
dnsNameSlice := []string{}
for nodeName, diskNames := range nodeDisks {
for _, diskName := range diskNames {
setNodeDisk(attached, diskName, nodeName, false)
}
dnsNameSlice = append(dnsNameSlice, mapNodeNameToPrivateDNSName(nodeName))
}
awsInstances, err := c.getInstancesByNodeNames(dnsNameSlice)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// When there is an error fetching instance information
// it is safer to return nil and let volume information not be touched.
return nil, err
}
if len(awsInstances) == 0 {
glog.V(2).Infof("DisksAreAttached will assume no disks are attached to any node on AWS cluster.")
return attached, nil
}
awsInstanceMap := make(map[types.NodeName]*ec2.Instance)
for _, awsInstance := range awsInstances {
awsInstanceMap[mapInstanceToNodeName(awsInstance)] = awsInstance
}
// Note that we check that the volume is attached to the correct node, not that it is attached to _a_ node
for nodeName, diskNames := range nodeDisks {
awsInstance := awsInstanceMap[nodeName]
if awsInstance == nil {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
continue
}
return attached, err
}
for _, blockDevice := range instance.BlockDeviceMappings {
volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
diskName, found := idToDiskName[volumeID]
if found {
// Disk is still attached to node
attached[diskName] = true
idToDiskName := make(map[awsVolumeID]KubernetesVolumeID)
for _, diskName := range diskNames {
volumeID, err := diskName.mapToAWSVolumeID()
if err != nil {
return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
idToDiskName[volumeID] = diskName
}
for _, blockDevice := range awsInstance.BlockDeviceMappings {
volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
diskName, found := idToDiskName[volumeID]
if found {
// Disk is still attached to node
setNodeDisk(attached, diskName, nodeName, true)
}
}
}
@ -3146,7 +3176,24 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
return c.lastInstancesByNodeNames, nil
}
}
names := aws.StringSlice(nodeNames.List())
instances, err := c.getInstancesByNodeNames(nodeNames.List())
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, nil
}
glog.V(2).Infof("Caching instances for %v", nodeNames)
c.lastNodeNames = nodeNames
c.lastInstancesByNodeNames = instances
return instances, nil
}
func (c *Cloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, error) {
names := aws.StringSlice(nodeNames)
nodeNameFilter := &ec2.Filter{
Name: aws.String("private-dns-name"),
@ -3168,10 +3215,6 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins
glog.V(3).Infof("Failed to find any instances %v", nodeNames)
return nil, nil
}
glog.V(2).Infof("Caching instances for %v", nodeNames)
c.lastNodeNames = nodeNames
c.lastInstancesByNodeNames = instances
return instances, nil
}
@ -3251,3 +3294,18 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
awsInstance := newAWSInstance(c.ec2, instance)
return awsInstance, instance, err
}
func setNodeDisk(
nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool,
volumeID KubernetesVolumeID,
nodeName types.NodeName,
check bool) {
volumeMap := nodeDiskMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[KubernetesVolumeID]bool)
nodeDiskMap[nodeName] = volumeMap
}
volumeMap[volumeID] = check
}

View File

@ -121,12 +121,7 @@ func (rc *reconciler) updateSyncTime() {
func (rc *reconciler) syncStates() {
volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
for nodeName, volumes := range volumesPerNode {
err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld)
if err != nil {
glog.Errorf("Error in syncing states for volumes: %v", err)
}
}
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
}
func (rc *reconciler) reconcile() {

View File

@ -1118,6 +1118,10 @@ func (plugin *mockVolumePlugin) SupportsMountOption() bool {
return false
}
func (plugin *mockVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol.Spec, error) {
return nil, nil
}

View File

@ -78,37 +78,67 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName
}
func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec)
volumeIDList := []aws.KubernetesVolumeID{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
name := aws.KubernetesVolumeID(volumeSource.VolumeID)
volumeIDList = append(volumeIDList, name)
volumesAttachedCheck[spec] = true
volumeSpecMap[name] = spec
glog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for AWS", nodeName)
volumeNodeMap := map[types.NodeName][]*volume.Spec{
nodeName: specs,
}
attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName)
nodeVolumesResult := make(map[*volume.Spec]bool)
nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volumes (%v) is already attached to current node (%q). err=%v",
volumeIDList, nodeName, err)
glog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
return nodeVolumesResult, err
}
if result, ok := nodesVerificationMap[nodeName]; ok {
return result, nil
}
return nodeVolumesResult, nil
}
func (attacher *awsElasticBlockStoreAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
diskNamesByNode := make(map[types.NodeName][]aws.KubernetesVolumeID)
volumeSpecMap := make(map[aws.KubernetesVolumeID]*volume.Spec)
for nodeName, volumeSpecs := range volumesByNode {
for _, volumeSpec := range volumeSpecs {
volumeSource, _, err := getVolumeSource(volumeSpec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
continue
}
name := aws.KubernetesVolumeID(volumeSource.VolumeID)
diskNamesByNode[nodeName] = append(diskNamesByNode[nodeName], name)
nodeDisk, nodeDiskExists := volumesAttachedCheck[nodeName]
if !nodeDiskExists {
nodeDisk = make(map[*volume.Spec]bool)
}
nodeDisk[volumeSpec] = true
volumeSpecMap[name] = volumeSpec
volumesAttachedCheck[nodeName] = nodeDisk
}
}
attachedResult, err := attacher.awsVolumes.DisksAreAttached(diskNamesByNode)
if err != nil {
glog.Errorf("Error checking if volumes are attached to nodes err = %v", err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
for nodeName, nodeDisks := range attachedResult {
for diskName, attached := range nodeDisks {
if !attached {
spec := volumeSpecMap[diskName]
setNodeDisk(volumesAttachedCheck, spec, nodeName, false)
}
}
}
return volumesAttachedCheck, nil
}
@ -249,3 +279,17 @@ func (detacher *awsElasticBlockStoreDetacher) Detach(deviceMountPath string, nod
func (detacher *awsElasticBlockStoreDetacher) UnmountDevice(deviceMountPath string) error {
return volumeutil.UnmountPath(deviceMountPath, detacher.mounter)
}
func setNodeDisk(
nodeDiskMap map[types.NodeName]map[*volume.Spec]bool,
volumeSpec *volume.Spec,
nodeName types.NodeName,
check bool) {
volumeMap := nodeDiskMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[*volume.Spec]bool)
nodeDiskMap[nodeName] = volumeMap
}
volumeMap[volumeSpec] = check
}

View File

@ -321,7 +321,7 @@ func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeNa
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) {
func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) {
return nil, errors.New("Not implemented")
}

View File

@ -91,6 +91,10 @@ func (plugin *awsElasticBlockStorePlugin) SupportsMountOption() bool {
return true
}
func (plugin *awsElasticBlockStorePlugin) SupportsBulkVolumeVerification() bool {
return true
}
func (plugin *awsElasticBlockStorePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -107,6 +107,10 @@ func (plugin *azureDataDiskPlugin) SupportsMountOption() bool {
return true
}
func (plugin *azureDataDiskPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *azureDataDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -83,6 +83,10 @@ func (plugin *azureFilePlugin) SupportsMountOption() bool {
return true
}
func (plugin *azureFilePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *azureFilePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -76,6 +76,10 @@ func (plugin *cephfsPlugin) SupportsMountOption() bool {
return true
}
func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -101,6 +101,10 @@ func (plugin *cinderPlugin) RequiresRemount() bool {
func (plugin *cinderPlugin) SupportsMountOption() bool {
return true
}
func (plugin *cinderPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *cinderPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {

View File

@ -80,6 +80,10 @@ func (plugin *configMapPlugin) SupportsMountOption() bool {
return false
}
func (plugin *configMapPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *configMapPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return &configMapVolumeMounter{
configMapVolume: &configMapVolume{spec.Name(), pod.UID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}},

View File

@ -86,6 +86,10 @@ func (plugin *downwardAPIPlugin) SupportsMountOption() bool {
return false
}
func (plugin *downwardAPIPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *downwardAPIPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
v := &downwardAPIVolume{
volName: spec.Name(),

View File

@ -94,6 +94,10 @@ func (plugin *emptyDirPlugin) SupportsMountOption() bool {
return false
}
func (plugin *emptyDirPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *emptyDirPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(), &realMountDetector{plugin.host.GetMounter()}, opts)
}

View File

@ -82,6 +82,10 @@ func (plugin *fcPlugin) SupportsMountOption() bool {
return false
}
func (plugin *fcPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *fcPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -177,6 +177,10 @@ func (plugin *flexVolumePlugin) unsupported(commands ...string) {
plugin.unsupportedCommands = append(plugin.unsupportedCommands, commands...)
}
func (plugin *flexVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
// Returns true iff the given command is known to be unsupported.
func (plugin *flexVolumePlugin) isUnsupported(command string) bool {
plugin.Lock()

View File

@ -116,6 +116,10 @@ func (p *flockerPlugin) SupportsMountOption() bool {
return false
}
func (plugin *flockerPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -86,6 +86,10 @@ func (plugin *gcePersistentDiskPlugin) SupportsMountOption() bool {
return true
}
func (plugin *gcePersistentDiskPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *gcePersistentDiskPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -85,6 +85,10 @@ func (plugin *gitRepoPlugin) SupportsMountOption() bool {
return false
}
func (plugin *gitRepoPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *gitRepoPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return &gitRepoVolumeMounter{
gitRepoVolume: &gitRepoVolume{

View File

@ -119,6 +119,10 @@ func (plugin *glusterfsPlugin) SupportsMountOption() bool {
return true
}
func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -87,6 +87,10 @@ func (plugin *hostPathPlugin) SupportsMountOption() bool {
return false
}
func (plugin *hostPathPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *hostPathPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -86,6 +86,10 @@ func (plugin *iscsiPlugin) SupportsMountOption() bool {
return true
}
func (plugin *iscsiPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *iscsiPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -92,6 +92,10 @@ func (plugin *nfsPlugin) SupportsMountOption() bool {
return true
}
func (plugin *nfsPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *nfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -83,6 +83,10 @@ func (plugin *photonPersistentDiskPlugin) SupportsMountOption() bool {
return true
}
func (plugin *photonPersistentDiskPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *photonPersistentDiskPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &PhotonDiskUtil{}, plugin.host.GetMounter())
}

View File

@ -112,6 +112,11 @@ type VolumePlugin interface {
// Specifying mount options in a volume plugin that doesn't support
// user specified mount options will result in error creating persistent volumes
SupportsMountOption() bool
// SupportsBulkVolumeVerification checks if volume plugin type is capable
// of enabling bulk polling of all nodes. This can speed up verification of
// attached volumes by quite a bit, but underlying pluging must support it.
SupportsBulkVolumeVerification() bool
}
// PersistentVolumePlugin is an extended interface of VolumePlugin and is used

View File

@ -81,6 +81,10 @@ func (plugin *testPlugins) SupportsMountOption() bool {
return false
}
func (plugin *testPlugins) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *testPlugins) NewMounter(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (Mounter, error) {
return nil, nil
}

View File

@ -179,6 +179,10 @@ func (plugin *portworxVolumePlugin) SupportsMountOption() bool {
return false
}
func (plugin *portworxVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func getVolumeSource(
spec *volume.Spec) (*v1.PortworxVolumeSource, bool, error) {
if spec.Volume != nil && spec.Volume.PortworxVolume != nil {

View File

@ -96,6 +96,10 @@ func (plugin *projectedPlugin) SupportsMountOption() bool {
return false
}
func (plugin *projectedPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *projectedPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return &projectedVolumeMounter{
projectedVolume: &projectedVolume{

View File

@ -122,6 +122,10 @@ func (plugin *quobytePlugin) SupportsMountOption() bool {
return true
}
func (plugin *quobytePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *quobytePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -90,6 +90,10 @@ func (plugin *rbdPlugin) SupportsMountOption() bool {
return true
}
func (plugin *rbdPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *rbdPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,

View File

@ -89,6 +89,10 @@ func (plugin *secretPlugin) SupportsMountOption() bool {
return false
}
func (plugin *secretPlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *secretPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return &secretVolumeMounter{
secretVolume: &secretVolume{

View File

@ -207,6 +207,10 @@ func (plugin *FakeVolumePlugin) SupportsMountOption() bool {
return true
}
func (plugin *FakeVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *FakeVolumePlugin) NewMounter(spec *Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
plugin.Lock()
defer plugin.Unlock()

View File

@ -40,6 +40,7 @@ go_test(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",

View File

@ -24,6 +24,8 @@ import (
"strings"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/mount"
@ -59,12 +61,16 @@ type OperationExecutor interface {
// It then updates the actual state of the world to reflect that.
AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node.
// VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node.
// If any volume is not attached right now, it will update the actual state of the world to reflect that.
// Note that this operation could be operated concurrently with other attach/detach operations.
// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
// even if it is attached. But reconciler can correct this in a short period of time.
VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node
// If any volume is not attached right now, it will update actual state of world to reflect that.
VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
// DetachVolume detaches the volume from the node specified in
// volumeToDetach, and updates the actual state of the world to reflect
@ -387,8 +393,84 @@ func (oe *operationExecutor) DetachVolume(
return oe.pendingOperations.Run(
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
}
func (oe *operationExecutor) VerifyVolumesAreAttached(
attachedVolumes map[types.NodeName][]AttachedVolume,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
// A map of plugin names and nodes on which they exist with volumes they manage
bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
for node, nodeAttachedVolumes := range attachedVolumes {
for _, volumeAttached := range nodeAttachedVolumes {
volumePlugin, err :=
oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil {
glog.Errorf(
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
volumeAttached.VolumeName,
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
continue
}
pluginName := volumePlugin.GetPluginName()
if volumePlugin.SupportsBulkVolumeVerification() {
pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
if !pluginNodesExist {
pluginNodes = make(map[types.NodeName][]*volume.Spec)
}
volumeSpecList, nodeExists := pluginNodes[node]
if !nodeExists {
volumeSpecList = []*volume.Spec{}
}
volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
pluginNodes[node] = volumeSpecList
bulkVerifyPluginsByNode[pluginName] = pluginNodes
volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
if !mapExists {
volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
}
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
volumeSpecMapByPlugin[pluginName] = volumeSpecMap
continue
}
// If node doesn't support Bulk volume polling it is best to poll individually
nodeError := oe.VerifyVolumesAreAttachedPerNode(nodeAttachedVolumes, node, actualStateOfWorld)
if nodeError != nil {
glog.Errorf("BulkVerifyVolumes.VerifyVolumesAreAttached verifying volumes on node %q with %v", node, nodeError)
}
break
}
}
for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
pluginNodeVolumes,
pluginName,
volumeSpecMapByPlugin[pluginName],
actualStateOfWorld)
if err != nil {
glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
}
// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
uniquePluginName := v1.UniqueVolumeName(pluginName)
err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc)
if err != nil {
glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
}
}
}
func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
attachedVolumes []AttachedVolume,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -197,7 +198,7 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) {
// Act
for i := 0; i < numVolumesToVerifyAttached; i++ {
oe.VerifyVolumesAreAttached(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
}
// Assert
@ -281,6 +282,21 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v
}, nil
}
func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
pluginNodeVolumes map[types.NodeName][]*volume.Spec,
pluginNane string,
volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}, nil
}
func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
return nil
}
func getTestPodWithSecret(podName, secretName string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{

View File

@ -88,6 +88,14 @@ type OperationGenerator interface {
// Generates the function needed to check if the attach_detach controller has attached the volume plugin
GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error)
// GetVolumePluginMgr returns volume plugin manager
GetVolumePluginMgr() *volume.VolumePluginMgr
GenerateBulkVolumeVerifyFunc(
map[types.NodeName][]*volume.Spec,
string,
map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error)
}
func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
@ -167,6 +175,71 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
}, nil
}
func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
pluginNodeVolumes map[types.NodeName][]*volume.Spec,
pluginName string,
volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
return func() error {
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
glog.Errorf(
"BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v",
pluginName,
err)
return nil
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
if newAttacherErr != nil {
glog.Errorf(
"BulkVerifyVolumes failed for getting plugin %q with: %v",
attachableVolumePlugin,
newAttacherErr)
return nil
}
bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier)
if !ok {
glog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier)
return nil
}
attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes)
if bulkAttachErr != nil {
glog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr)
return nil
}
for nodeName, volumeSpecs := range pluginNodeVolumes {
for _, volumeSpec := range volumeSpecs {
nodeVolumeSpecs, nodeChecked := attached[nodeName]
if !nodeChecked {
glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached",
nodeName,
volumeSpec.Name())
continue
}
check := nodeVolumeSpecs[volumeSpec]
if !check {
glog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q",
nodeName,
volumeSpec.Name())
actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName)
}
}
}
return nil
}, nil
}
func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
@ -233,6 +306,10 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}, nil
}
func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
return og.volumePluginMgr
}
func (og *operationGenerator) GenerateDetachVolumeFunc(
volumeToDetach AttachedVolume,
verifySafeToDetach bool,

View File

@ -187,6 +187,14 @@ type Attacher interface {
MountDevice(spec *Spec, devicePath string, deviceMountPath string) error
}
type BulkVolumeVerifier interface {
// BulkVerifyVolumes checks whether the list of volumes still attached to the
// the clusters in the node. It returns a map which maps from the volume spec to the checking result.
// If an error occurs during check - error should be returned and volume on nodes
// should be assumed as still attached.
BulkVerifyVolumes(volumesByNode map[types.NodeName][]*Spec) (map[types.NodeName]map[*Spec]bool, error)
}
// Detacher can detach a volume from a node.
type Detacher interface {
// Detach the given device from the node with the given Name.

View File

@ -84,6 +84,10 @@ func (plugin *vsphereVolumePlugin) SupportsMountOption() bool {
return true
}
func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &VsphereDiskUtil{}, plugin.host.GetMounter())
}

View File

@ -63,7 +63,7 @@ func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName
return false, fmt.Errorf("not implemented")
}
func (c *mockVolumes) DisksAreAttached(diskNames []aws.KubernetesVolumeID, nodeName types.NodeName) (map[aws.KubernetesVolumeID]bool, error) {
func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) {
return nil, fmt.Errorf("not implemented")
}