From abbde4337485d0286ab51bbebff54ff02eb33694 Mon Sep 17 00:00:00 2001 From: Jing Xu Date: Fri, 14 Oct 2016 14:21:58 -0700 Subject: [PATCH] Add sync state loop in master's volume reconciler At master volume reconciler, the information about which volumes are attached to nodes is cached in actual state of world. However, this information might be out of date in case that node is terminated (volume is detached automatically). In this situation, reconciler assume volume is still attached and will not issue attach operation when node comes back. Pods created on those nodes will fail to mount. This PR adds the logic to periodically sync up the truth for attached volumes kept in the actual state cache. If the volume is no longer attached to the node, the actual state will be updated to reflect the truth. In turn, reconciler will take actions if needed. To avoid issuing many concurrent operations on cloud provider, this PR tries to add batch operation to check whether a list of volumes are attached to the node instead of one request per volume. More details are explained in PR #33760 --- pkg/cloudprovider/providers/aws/aws.go | 38 +++ .../providers/azure/azure_storage.go | 28 +++ pkg/cloudprovider/providers/gce/gce.go | 36 +++ .../providers/openstack/openstack_volumes.go | 18 ++ .../providers/rackspace/rackspace.go | 21 ++ .../providers/vsphere/vsphere.go | 65 ++++- .../attachdetach/attach_detach_controller.go | 5 + .../cache/actual_state_of_world.go | 21 ++ .../attachdetach/reconciler/reconciler.go | 235 ++++++++++-------- .../reconciler/reconciler_test.go | 11 +- pkg/volume/aws_ebs/attacher.go | 34 +++ pkg/volume/aws_ebs/attacher_test.go | 4 + pkg/volume/azure_dd/attacher.go | 34 +++ pkg/volume/azure_dd/azure_dd.go | 2 + pkg/volume/cinder/attacher.go | 34 +++ pkg/volume/cinder/attacher_test.go | 4 + pkg/volume/cinder/cinder.go | 1 + pkg/volume/gce_pd/attacher.go | 34 +++ pkg/volume/gce_pd/attacher_test.go | 4 + pkg/volume/testing/testing.go | 6 + .../nestedpendingoperations.go | 9 + .../operationexecutor/operation_executor.go | 97 ++++++++ pkg/volume/volume.go | 5 + pkg/volume/vsphere_volume/attacher.go | 33 +++ pkg/volume/vsphere_volume/attacher_test.go | 4 + .../persistentvolume/label/admission_test.go | 4 + 26 files changed, 679 insertions(+), 108 deletions(-) diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index bc2968526a0..ae58e72d3a8 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -321,6 +321,9 @@ type Volumes interface { // Check if the volume is already attached to the node with the specified NodeName DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) + + // Check if a list of volumes are attached to the node with the specified NodeName + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) } // InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups @@ -1774,6 +1777,41 @@ func (c *Cloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, return false, nil } +func (c *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + awsInstance, err := c.getAwsInstance(nodeName) + if err != nil { + if err == cloudprovider.InstanceNotFound { + // If instance no longer exists, safe to assume volume is not attached. + glog.Warningf( + "Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.", + nodeName, + diskNames) + return attached, nil + } + + return attached, err + } + info, err := awsInstance.describeInstance() + if err != nil { + return attached, err + } + for _, blockDevice := range info.BlockDeviceMappings { + volumeId := aws.StringValue(blockDevice.Ebs.VolumeId) + for _, diskName := range diskNames { + if volumeId == diskName { + // Disk is still attached to node + attached[diskName] = true + } + } + } + + return attached, nil +} + // Gets the current load balancer state func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) { request := &elb.DescribeLoadBalancersInput{} diff --git a/pkg/cloudprovider/providers/azure/azure_storage.go b/pkg/cloudprovider/providers/azure/azure_storage.go index 7f9d5276a18..d16c6a65840 100644 --- a/pkg/cloudprovider/providers/azure/azure_storage.go +++ b/pkg/cloudprovider/providers/azure/azure_storage.go @@ -75,6 +75,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l return err } +// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName +func (az *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + vm, exists, err := az.getVirtualMachine(nodeName) + if !exists { + // if host doesn't exist, no need to detach + glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.", + nodeName, diskNames) + return attached, nil + } else if err != nil { + return attached, err + } + + disks := *vm.Properties.StorageProfile.DataDisks + for _, disk := range disks { + for _, diskName := range diskNames { + if disk.Name != nil && diskName != "" && *disk.Name == diskName { + attached[diskName] = true + } + } + } + + return attached, nil +} + // DetachDiskByName detaches a vhd from host // the vhd can be identified by diskName or diskURI func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error { diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index dcc1d74f347..126feec37d9 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -133,6 +133,10 @@ type Disks interface { // DiskIsAttached checks if a disk is attached to the node with the specified NodeName. DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) + // DisksAreAttached is a batch function to check if a list of disks are attached + // to the node with the specified NodeName. + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) + // CreateDisk creates a new PD with given properties. Tags are serialized // as JSON into Description field. CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error @@ -2651,6 +2655,38 @@ func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (b return false, nil } +func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + instanceName := mapNodeNameToInstanceName(nodeName) + instance, err := gce.getInstanceByName(instanceName) + if err != nil { + if err == cloudprovider.InstanceNotFound { + // If instance no longer exists, safe to assume volume is not attached. + glog.Warningf( + "Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.", + instanceName, + diskNames) + return attached, nil + } + + return attached, err + } + + for _, instanceDisk := range instance.Disks { + for _, diskName := range diskNames { + if instanceDisk.DeviceName == diskName { + // Disk is still attached to node + attached[diskName] = true + } + } + } + + return attached, nil +} + // Returns a gceDisk for the disk, if it is found in the specified zone. // If not found, returns (nil, nil) func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) { diff --git a/pkg/cloudprovider/providers/openstack/openstack_volumes.go b/pkg/cloudprovider/providers/openstack/openstack_volumes.go index f27112e9f05..7b65e522f43 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_volumes.go +++ b/pkg/cloudprovider/providers/openstack/openstack_volumes.go @@ -240,6 +240,24 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) { return false, nil } +// query if a list of volumes are attached to a compute instance +func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + for _, diskName := range diskNames { + disk, err := os.getVolume(diskName) + if err != nil { + continue + } + if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] { + attached[diskName] = true + } + } + return attached, nil +} + // diskIsUsed returns true a disk is attached to any node. func (os *OpenStack) diskIsUsed(diskName string) (bool, error) { disk, err := os.getVolume(diskName) diff --git a/pkg/cloudprovider/providers/rackspace/rackspace.go b/pkg/cloudprovider/providers/rackspace/rackspace.go index f8dd0f6fbbb..b7e1a08f6de 100644 --- a/pkg/cloudprovider/providers/rackspace/rackspace.go +++ b/pkg/cloudprovider/providers/rackspace/rackspace.go @@ -660,3 +660,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) { } return false, nil } + +// query if a list volumes are attached to a compute instance +func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + var returnedErr error + for _, diskName := range diskNames { + result, err := rs.DiskIsAttached(diskName, instanceID) + if err != nil { + returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr) + continue + } + if result { + attached[diskName] = true + } + + } + return attached, returnedErr +} diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index 023cd041dd1..804931a96bb 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -150,6 +150,10 @@ type Volumes interface { // Assumption: If node doesn't exist, disk is not attached to the node. DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error) + // DisksAreAttached checks if a list disks are attached to the given node. + // Assumption: If node doesn't exist, disks are not attached to the node. + DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error) + // CreateVolume creates a new vmdk with specified parameters. CreateVolume(volumeOptions *VolumeOptions) (volumePath string, err error) @@ -968,6 +972,63 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b return attached, err } +// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin. +func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create vSphere client + attached := make(map[string]bool) + for _, volPath := range volPaths { + attached[volPath] = false + } + err := vSphereLogin(vs, ctx) + if err != nil { + glog.Errorf("Failed to login into vCenter, err: %v", err) + return attached, err + } + + // Find VM to detach disk from + var vSphereInstance string + if nodeName == "" { + vSphereInstance = vs.localInstanceID + nodeName = vmNameToNodeName(vSphereInstance) + } else { + vSphereInstance = nodeNameToVMName(nodeName) + } + + nodeExist, err := vs.NodeExists(vs.client, nodeName) + + if err != nil { + glog.Errorf("Failed to check whether node exist. err: %s.", err) + return attached, err + } + + if !nodeExist { + glog.Warningf( + "Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.", + vSphereInstance, + volPaths) + return attached, nil + } + + // Get VM device list + _, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance) + if err != nil { + glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err) + return attached, err + } + + for _, volPath := range volPaths { + result, _ := checkDiskAttached(volPath, vmDevices, dc, vs.client) + if result { + attached[volPath] = true + } + } + + return attached, err +} + func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) { virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client) if err != nil { @@ -978,7 +1039,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o return false, err } for _, controllerType := range supportedSCSIControllerType { - controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client) + controllerkey, _ := getControllerKey(controllerType, vmdevices) if controllerkey == virtualDiskControllerKey { return true, nil } @@ -1010,7 +1071,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL // Returns key of the controller. // Key is unique id that distinguishes one device from other devices in the same virtual machine. -func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) { +func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) { for _, device := range vmDevices { devType := vmDevices.Type(device) if devType == scsiType { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 5a994cb8a38..133b69fb383 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -56,6 +56,10 @@ const ( // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute + + // reconcilerSyncDuration is the amount of time the reconciler sync states loop + // wait between successive executions + reconcilerSyncDuration time.Duration = 5 * time.Second ) // AttachDetachController defines the operations supported by this controller. @@ -122,6 +126,7 @@ func NewAttachDetachController( adc.reconciler = reconciler.NewReconciler( reconcilerLoopPeriod, reconcilerMaxWaitForUnmountDuration, + reconcilerSyncDuration, adc.desiredStateOfWorld, adc.actualStateOfWorld, adc.attacherDetacher, diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 2826125b024..3c203bbcd2f 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -106,6 +106,8 @@ type ActualStateOfWorld interface { // based on the current actual state of the world. GetAttachedVolumesForNode(nodeName types.NodeName) []AttachedVolume + GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume + // GetVolumesToReportAttached returns a map containing the set of nodes for // which the VolumesAttached Status field in the Node API object should be // updated. The key in this map is the name of the node to update and the @@ -541,6 +543,25 @@ func (asw *actualStateOfWorld) GetAttachedVolumesForNode( return attachedVolumes } +func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume { + asw.RLock() + defer asw.RUnlock() + + attachedVolumesPerNode := make(map[types.NodeName][]operationexecutor.AttachedVolume) + for _, volumeObj := range asw.attachedVolumes { + for nodeName, nodeObj := range volumeObj.nodesAttachedTo { + volumes, exists := attachedVolumesPerNode[nodeName] + if !exists { + volumes = []operationexecutor.AttachedVolume{} + } + volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume) + attachedVolumesPerNode[nodeName] = volumes + } + } + + return attachedVolumesPerNode +} + func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]api.AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 6e9816802c3..74c330cd75d 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -56,6 +56,7 @@ type Reconciler interface { func NewReconciler( loopPeriod time.Duration, maxWaitForUnmountDuration time.Duration, + syncDuration time.Duration, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, attacherDetacher operationexecutor.OperationExecutor, @@ -63,20 +64,24 @@ func NewReconciler( return &reconciler{ loopPeriod: loopPeriod, maxWaitForUnmountDuration: maxWaitForUnmountDuration, + syncDuration: syncDuration, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, attacherDetacher: attacherDetacher, nodeStatusUpdater: nodeStatusUpdater, + timeOfLastSync: time.Now(), } } type reconciler struct { loopPeriod time.Duration maxWaitForUnmountDuration time.Duration + syncDuration time.Duration desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld attacherDetacher operationexecutor.OperationExecutor nodeStatusUpdater statusupdater.NodeStatusUpdater + timeOfLastSync time.Time } func (rc *reconciler) Run(stopCh <-chan struct{}) { @@ -85,107 +90,135 @@ func (rc *reconciler) Run(stopCh <-chan struct{}) { func (rc *reconciler) reconciliationLoopFunc() func() { return func() { - // Detaches are triggered before attaches so that volumes referenced by - // pods that are rescheduled to a different node are detached first. - - // Ensure volumes that should be detached are detached. - for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { - if !rc.desiredStateOfWorld.VolumeExists( - attachedVolume.VolumeName, attachedVolume.NodeName) { - // Set the detach request time - elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) - if err != nil { - glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) - continue - } - // Check whether timeout has reached the maximum waiting time - timeout := elapsedTime > rc.maxWaitForUnmountDuration - // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout - if attachedVolume.MountedByNode && !timeout { - glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted", - attachedVolume.VolumeName, - attachedVolume.NodeName) - continue - } - - // Before triggering volume detach, mark volume as detached and update the node status - // If it fails to update node status, skip detach volume - rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) - - // Update Node Status to indicate volume is no longer safe to mount. - err = rc.nodeStatusUpdater.UpdateNodeStatuses() - if err != nil { - // Skip detaching this volume if unable to update node status - glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ", - attachedVolume.VolumeName, - attachedVolume.NodeName, - err) - continue - } - - // Trigger detach volume which requires verifing safe to detach step - // If timeout is true, skip verifySafeToDetach check - glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - verifySafeToDetach := !timeout - err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) - if err == nil { - if !timeout { - glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - } else { - glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", - attachedVolume.VolumeName, - attachedVolume.NodeName, - rc.maxWaitForUnmountDuration) - } - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), - attachedVolume.NodeName, - err) - } - } - } - - // Ensure volumes that should be attached are attached. - for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { - if rc.actualStateOfWorld.VolumeNodeExists( - volumeToAttach.VolumeName, volumeToAttach.NodeName) { - // Volume/Node exists, touch it to reset detachRequestedTime - glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) - rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) - } else { - // Volume/Node doesn't exist, spawn a goroutine to attach it - glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) - if err == nil { - glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v", - volumeToAttach.VolumeName, - volumeToAttach.VolumeSpec.Name(), - volumeToAttach.NodeName, - err) - } - } - } - - // Update Node Status - err := rc.nodeStatusUpdater.UpdateNodeStatuses() - if err != nil { - glog.Infof("UpdateNodeStatuses failed with: %v", err) + rc.reconcile() + // reconciler periodically checks whether the attached volumes from actual state + // are still attached to the node and udpate the status if they are not. + if time.Since(rc.timeOfLastSync) > rc.syncDuration { + rc.sync() } } } + +func (rc *reconciler) sync() { + defer rc.updateSyncTime() + rc.syncStates() +} + +func (rc *reconciler) updateSyncTime() { + rc.timeOfLastSync = time.Now() +} + +func (rc *reconciler) syncStates() { + volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode() + for nodeName, volumes := range volumesPerNode { + err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld) + if err != nil { + glog.Errorf("Error in syncing states for volumes: %v", err) + } + } +} + +func (rc *reconciler) reconcile() { + // Detaches are triggered before attaches so that volumes referenced by + // pods that are rescheduled to a different node are detached first. + + // Ensure volumes that should be detached are detached. + for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { + if !rc.desiredStateOfWorld.VolumeExists( + attachedVolume.VolumeName, attachedVolume.NodeName) { + // Set the detach request time + elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) + if err != nil { + glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) + continue + } + // Check whether timeout has reached the maximum waiting time + timeout := elapsedTime > rc.maxWaitForUnmountDuration + // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout + if attachedVolume.MountedByNode && !timeout { + glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted", + attachedVolume.VolumeName, + attachedVolume.NodeName) + continue + } + + // Before triggering volume detach, mark volume as detached and update the node status + // If it fails to update node status, skip detach volume + rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) + + // Update Node Status to indicate volume is no longer safe to mount. + err = rc.nodeStatusUpdater.UpdateNodeStatuses() + if err != nil { + // Skip detaching this volume if unable to update node status + glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ", + attachedVolume.VolumeName, + attachedVolume.NodeName, + err) + continue + } + + // Trigger detach volume which requires verifing safe to detach step + // If timeout is true, skip verifySafeToDetach check + glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) + verifySafeToDetach := !timeout + err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) + if err == nil { + if !timeout { + glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) + } else { + glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", + attachedVolume.VolumeName, + attachedVolume.NodeName, + rc.maxWaitForUnmountDuration) + } + } + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name(), + attachedVolume.NodeName, + err) + } + } + } + + // Ensure volumes that should be attached are attached. + for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { + if rc.actualStateOfWorld.VolumeNodeExists( + volumeToAttach.VolumeName, volumeToAttach.NodeName) { + // Volume/Node exists, touch it to reset detachRequestedTime + glog.V(1).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) + rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) + } else { + // Volume/Node doesn't exist, spawn a goroutine to attach it + glog.V(1).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) + if err == nil { + glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v", + volumeToAttach.VolumeName, + volumeToAttach.VolumeSpec.Name(), + volumeToAttach.NodeName, + err) + } + } + } + + // Update Node Status + err := rc.nodeStatusUpdater.UpdateNodeStatuses() + if err != nil { + glog.Infof("UpdateNodeStatuses failed with: %v", err) + } +} diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index a23a34aa50d..20de8df3960 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -35,6 +35,7 @@ import ( const ( reconcilerLoopPeriod time.Duration = 0 * time.Millisecond + syncLoopPeriod time.Duration = 100 * time.Minute maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond resyncPeriod time.Duration = 5 * time.Minute ) @@ -55,7 +56,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { nsu := statusupdater.NewNodeStatusUpdater( fakeKubeClient, nodeInformer, asw) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) // Act ch := make(chan struct{}) @@ -83,7 +84,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -129,7 +130,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -196,7 +197,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -263,7 +264,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index 328d45ed71f..59e2566a8f3 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -77,6 +77,40 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName return devicePath, nil } +func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.VolumeID) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumeID] = spec + } + attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if volumes (%v) is already attached to current node (%q). err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index 1615a371836..d3b8cf98567 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -317,6 +317,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) { return "", errors.New("Not implemented") } diff --git a/pkg/volume/azure_dd/attacher.go b/pkg/volume/azure_dd/attacher.go index dfb12b9f0c4..ed99cfa4bc6 100644 --- a/pkg/volume/azure_dd/attacher.go +++ b/pkg/volume/azure_dd/attacher.go @@ -115,6 +115,40 @@ func (attacher *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.Node return strconv.Itoa(int(lun)), err } +func (attacher *azureDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.DiskName) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.DiskName] = spec + } + attachedResult, err := attacher.azureProvider.DisksAreAttached(volumeIDList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if volumes (%v) are attached to current node (%q). err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + // WaitForAttach runs on the node to detect if the volume (referenced by LUN) is attached. If attached, the device path is returned func (attacher *azureDiskAttacher) WaitForAttach(spec *volume.Spec, lunStr string, timeout time.Duration) (string, error) { volumeSource, err := getVolumeSource(spec) diff --git a/pkg/volume/azure_dd/azure_dd.go b/pkg/volume/azure_dd/azure_dd.go index 2ae776fbd29..66fb31266a1 100644 --- a/pkg/volume/azure_dd/azure_dd.go +++ b/pkg/volume/azure_dd/azure_dd.go @@ -53,6 +53,8 @@ type azureCloudProvider interface { AttachDisk(diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error // Detaches the disk, identified by disk name or uri, from the host machine. DetachDiskByName(diskName, diskUri string, nodeName types.NodeName) error + // Check if a list of volumes are attached to the node with the specified NodeName + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) // Get the LUN number of the disk that is attached to the host GetDiskLun(diskName, diskUri string, nodeName types.NodeName) (int32, error) // Get the next available LUN number to attach a new VHD diff --git a/pkg/volume/cinder/attacher.go b/pkg/volume/cinder/attacher.go index 38e2c3c8dd2..064ab26a60b 100644 --- a/pkg/volume/cinder/attacher.go +++ b/pkg/volume/cinder/attacher.go @@ -109,6 +109,40 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.Nod return devicePath, err } +func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.VolumeID) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumeID] = spec + } + attachedResult, err := attacher.cinderProvider.DisksAreAttached(volumeIDList, string(nodeName)) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/cinder/attacher_test.go b/pkg/volume/cinder/attacher_test.go index 2191c1181c2..ded3156ebee 100644 --- a/pkg/volume/cinder/attacher_test.go +++ b/pkg/volume/cinder/attacher_test.go @@ -417,6 +417,10 @@ func (testcase *testcase) Instances() (cloudprovider.Instances, bool) { return &instances{testcase.instanceID}, true } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + // Implementation of fake cloudprovider.Instances type instances struct { instanceID string diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index 3aef51fc63e..817c4db486e 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -50,6 +50,7 @@ type CinderProvider interface { InstanceID() (string, error) GetAttachmentDiskPath(instanceID string, diskName string) (string, error) DiskIsAttached(diskName, instanceID string) (bool, error) + DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) Instances() (cloudprovider.Instances, bool) } diff --git a/pkg/volume/gce_pd/attacher.go b/pkg/volume/gce_pd/attacher.go index c159ec01b88..190e27acaff 100644 --- a/pkg/volume/gce_pd/attacher.go +++ b/pkg/volume/gce_pd/attacher.go @@ -96,6 +96,40 @@ func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, nodeName ty return path.Join(diskByIdPath, diskGooglePrefix+pdName), nil } +func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumePdNameMap := make(map[string]*volume.Spec) + pdNameList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + // If error is occured, skip this volume and move to the next one + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + pdNameList = append(pdNameList, volumeSource.PDName) + volumesAttachedCheck[spec] = true + volumePdNameMap[volumeSource.PDName] = spec + } + attachedResult, err := attacher.gceDisks.DisksAreAttached(pdNameList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if PDs (%v) are already attached to current node (%q). err=%v", + pdNameList, nodeName, err) + return volumesAttachedCheck, err + } + + for pdName, attached := range attachedResult { + if !attached { + spec := volumePdNameMap[pdName] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", pdName, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { ticker := time.NewTicker(checkSleepDuration) defer ticker.Stop() diff --git a/pkg/volume/gce_pd/attacher_test.go b/pkg/volume/gce_pd/attacher_test.go index 6a80f93b2f2..2ac3c94250e 100644 --- a/pkg/volume/gce_pd/attacher_test.go +++ b/pkg/volume/gce_pd/attacher_test.go @@ -353,6 +353,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error { return errors.New("Not implemented") } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index f16e83da037..8ccc405a8d1 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -423,6 +423,12 @@ func (fv *FakeVolume) Detach(deviceMountPath string, nodeName types.NodeName) er return nil } +func (fv *FakeVolume) VolumesAreAttached(spec []*Spec, nodeName types.NodeName) (map[*Spec]bool, error) { + fv.Lock() + defer fv.Unlock() + return nil, nil +} + func (fv *FakeVolume) GetDetachCallCount() int { fv.RLock() defer fv.RUnlock() diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 266b6cdf054..c992f84b051 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -38,6 +38,9 @@ import ( const ( // emptyUniquePodName is a UniquePodName for empty string. emptyUniquePodName types.UniquePodName = types.UniquePodName("") + + // emptyUniqueVolumeName is a UniqueVolumeName for empty string + emptyUniqueVolumeName api.UniqueVolumeName = api.UniqueVolumeName("") ) // NestedPendingOperations defines the supported set of operations. @@ -151,10 +154,16 @@ func (grm *nestedPendingOperations) IsOperationPending( return false } +// This is an internal function and caller should acquire and release the lock func (grm *nestedPendingOperations) isOperationExists( volumeName api.UniqueVolumeName, podName types.UniquePodName) (bool, int) { + // If volumeName is empty, operation can be executed concurrently + if volumeName == emptyUniqueVolumeName { + return false, -1 + } + for previousOpIndex, previousOp := range grm.operations { if previousOp.volumeName != volumeName { // No match, keep searching diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 2e15b8d39a3..3651a353606 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -64,6 +64,13 @@ type OperationExecutor interface { // It then updates the actual state of the world to reflect that. AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + // VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node. + // If any volume is not attached right now, it will update the actual state of the world to reflect that. + // Note that this operation could be operated concurrently with other attach/detach operations. + // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached + // even if it is attached. But reconciler can correct this in a short period of time. + VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + // DetachVolume detaches the volume from the node specified in // volumeToDetach, and updates the actual state of the world to reflect // that. If verifySafeToDetach is set, a call is made to the fetch the node @@ -397,6 +404,19 @@ func (oe *operationExecutor) DetachVolume( volumeToDetach.VolumeName, "" /* podName */, detachFunc) } +func (oe *operationExecutor) VerifyVolumesAreAttached( + attachedVolumes []AttachedVolume, + nodeName types.NodeName, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { + volumesAreAttachedFunc, err := + oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) + if err != nil { + return err + } + // Give an empty UniqueVolumeName so that this operation could be executed concurrently. + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc) +} + func (oe *operationExecutor) MountVolume( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, @@ -465,6 +485,83 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc) } +func (oe *operationExecutor) generateVolumesAreAttachedFunc( + attachedVolumes []AttachedVolume, + nodeName types.NodeName, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + + // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong + // to this type of plugin + volumesPerPlugin := make(map[string][]*volume.Spec) + // volumeSpecMap maps from a volume spec to its unique volumeName which will be used + // when calling MarkVolumeAsDetached + volumeSpecMap := make(map[*volume.Spec]api.UniqueVolumeName) + // Iterate each volume spec and put them into a map index by the pluginName + for _, volumeAttached := range attachedVolumes { + volumePlugin, err := + oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) + if err != nil || volumePlugin == nil { + glog.Errorf( + "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", + volumeAttached.VolumeName, + volumeAttached.VolumeSpec.Name(), + volumeAttached.NodeName, + err) + } + volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] + if !pluginExists { + volumeSpecList = []*volume.Spec{} + } + volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) + volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList + volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName + } + + return func() error { + + // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check + // whether the volumes are still attached. + for pluginName, volumesSpecs := range volumesPerPlugin { + attachableVolumePlugin, err := + oe.volumePluginMgr.FindAttachablePluginByName(pluginName) + if err != nil || attachableVolumePlugin == nil { + glog.Errorf( + "VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v", + pluginName, + err) + continue + } + + volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() + if newAttacherErr != nil { + glog.Errorf( + "VolumesAreAttached failed for getting plugin %q with: %v", + pluginName, + newAttacherErr) + continue + } + + attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName) + if areAttachedErr != nil { + glog.Errorf( + "VolumesAreAttached failed for checking on node %q with: %v", + nodeName, + areAttachedErr) + continue + } + + for spec, check := range attached { + if !check { + actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName) + glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.", + volumeSpecMap[spec], spec.Name()) + } + } + } + return nil + }, nil +} + func (oe *operationExecutor) generateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 13ff21414fa..54081c54534 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -146,6 +146,11 @@ type Attacher interface { // node. Attach(spec *Spec, nodeName types.NodeName) (string, error) + // VolumesAreAttached checks whether the list of volumes still attached to the specified + // the node. It returns a map which maps from the volume spec to the checking result. + // If an error is occured during checking, the error will be returned + VolumesAreAttached(specs []*Spec, nodeName types.NodeName) (map[*Spec]bool, error) + // WaitForAttach blocks until the device is attached to this // node. If it successfully attaches, the path to the device // is returned. Otherwise, if the device does not attach after diff --git a/pkg/volume/vsphere_volume/attacher.go b/pkg/volume/vsphere_volume/attacher.go index 49811987532..d722b17eafc 100644 --- a/pkg/volume/vsphere_volume/attacher.go +++ b/pkg/volume/vsphere_volume/attacher.go @@ -84,6 +84,39 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil } +func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumePathList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumePathList = append(volumePathList, volumeSource.VolumePath) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumePath] = spec + } + attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName) + if err != nil { + glog.Errorf( + "Error checking if volumes (%v) are attached to current node (%q). err=%v", + volumePathList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumePath, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumePath] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumePath, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/vsphere_volume/attacher_test.go b/pkg/volume/vsphere_volume/attacher_test.go index ec2b657dafd..e029b36a2e4 100644 --- a/pkg/volume/vsphere_volume/attacher_test.go +++ b/pkg/volume/vsphere_volume/attacher_test.go @@ -308,6 +308,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateVolume(volumeOptions *vsphere.VolumeOptions) (volumePath string, err error) { return "", errors.New("Not implemented") } diff --git a/plugin/pkg/admission/persistentvolume/label/admission_test.go b/plugin/pkg/admission/persistentvolume/label/admission_test.go index f2a848955e2..edd33b34bf6 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/persistentvolume/label/admission_test.go @@ -62,6 +62,10 @@ func (c *mockVolumes) DiskIsAttached(volumeName string, nodeName types.NodeName) return false, fmt.Errorf("not implemented") } +func (c *mockVolumes) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, fmt.Errorf("not implemented") +} + func mockVolumeFailure(err error) *mockVolumes { return &mockVolumes{volumeLabelsError: err} }