diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index bc2968526a0..ae58e72d3a8 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -321,6 +321,9 @@ type Volumes interface { // Check if the volume is already attached to the node with the specified NodeName DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) + + // Check if a list of volumes are attached to the node with the specified NodeName + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) } // InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups @@ -1774,6 +1777,41 @@ func (c *Cloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, return false, nil } +func (c *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + awsInstance, err := c.getAwsInstance(nodeName) + if err != nil { + if err == cloudprovider.InstanceNotFound { + // If instance no longer exists, safe to assume volume is not attached. + glog.Warningf( + "Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.", + nodeName, + diskNames) + return attached, nil + } + + return attached, err + } + info, err := awsInstance.describeInstance() + if err != nil { + return attached, err + } + for _, blockDevice := range info.BlockDeviceMappings { + volumeId := aws.StringValue(blockDevice.Ebs.VolumeId) + for _, diskName := range diskNames { + if volumeId == diskName { + // Disk is still attached to node + attached[diskName] = true + } + } + } + + return attached, nil +} + // Gets the current load balancer state func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) { request := &elb.DescribeLoadBalancersInput{} diff --git a/pkg/cloudprovider/providers/azure/azure_storage.go b/pkg/cloudprovider/providers/azure/azure_storage.go index 7f9d5276a18..d16c6a65840 100644 --- a/pkg/cloudprovider/providers/azure/azure_storage.go +++ b/pkg/cloudprovider/providers/azure/azure_storage.go @@ -75,6 +75,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l return err } +// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName +func (az *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + vm, exists, err := az.getVirtualMachine(nodeName) + if !exists { + // if host doesn't exist, no need to detach + glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.", + nodeName, diskNames) + return attached, nil + } else if err != nil { + return attached, err + } + + disks := *vm.Properties.StorageProfile.DataDisks + for _, disk := range disks { + for _, diskName := range diskNames { + if disk.Name != nil && diskName != "" && *disk.Name == diskName { + attached[diskName] = true + } + } + } + + return attached, nil +} + // DetachDiskByName detaches a vhd from host // the vhd can be identified by diskName or diskURI func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error { diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index dcc1d74f347..126feec37d9 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -133,6 +133,10 @@ type Disks interface { // DiskIsAttached checks if a disk is attached to the node with the specified NodeName. DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) + // DisksAreAttached is a batch function to check if a list of disks are attached + // to the node with the specified NodeName. + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) + // CreateDisk creates a new PD with given properties. Tags are serialized // as JSON into Description field. CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error @@ -2651,6 +2655,38 @@ func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (b return false, nil } +func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + instanceName := mapNodeNameToInstanceName(nodeName) + instance, err := gce.getInstanceByName(instanceName) + if err != nil { + if err == cloudprovider.InstanceNotFound { + // If instance no longer exists, safe to assume volume is not attached. + glog.Warningf( + "Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.", + instanceName, + diskNames) + return attached, nil + } + + return attached, err + } + + for _, instanceDisk := range instance.Disks { + for _, diskName := range diskNames { + if instanceDisk.DeviceName == diskName { + // Disk is still attached to node + attached[diskName] = true + } + } + } + + return attached, nil +} + // Returns a gceDisk for the disk, if it is found in the specified zone. // If not found, returns (nil, nil) func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) { diff --git a/pkg/cloudprovider/providers/openstack/openstack_volumes.go b/pkg/cloudprovider/providers/openstack/openstack_volumes.go index f27112e9f05..7b65e522f43 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_volumes.go +++ b/pkg/cloudprovider/providers/openstack/openstack_volumes.go @@ -240,6 +240,24 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) { return false, nil } +// query if a list of volumes are attached to a compute instance +func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + for _, diskName := range diskNames { + disk, err := os.getVolume(diskName) + if err != nil { + continue + } + if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] { + attached[diskName] = true + } + } + return attached, nil +} + // diskIsUsed returns true a disk is attached to any node. func (os *OpenStack) diskIsUsed(diskName string) (bool, error) { disk, err := os.getVolume(diskName) diff --git a/pkg/cloudprovider/providers/rackspace/rackspace.go b/pkg/cloudprovider/providers/rackspace/rackspace.go index f8dd0f6fbbb..b7e1a08f6de 100644 --- a/pkg/cloudprovider/providers/rackspace/rackspace.go +++ b/pkg/cloudprovider/providers/rackspace/rackspace.go @@ -660,3 +660,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) { } return false, nil } + +// query if a list volumes are attached to a compute instance +func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) { + attached := make(map[string]bool) + for _, diskName := range diskNames { + attached[diskName] = false + } + var returnedErr error + for _, diskName := range diskNames { + result, err := rs.DiskIsAttached(diskName, instanceID) + if err != nil { + returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr) + continue + } + if result { + attached[diskName] = true + } + + } + return attached, returnedErr +} diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index 023cd041dd1..804931a96bb 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -150,6 +150,10 @@ type Volumes interface { // Assumption: If node doesn't exist, disk is not attached to the node. DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error) + // DisksAreAttached checks if a list disks are attached to the given node. + // Assumption: If node doesn't exist, disks are not attached to the node. + DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error) + // CreateVolume creates a new vmdk with specified parameters. CreateVolume(volumeOptions *VolumeOptions) (volumePath string, err error) @@ -968,6 +972,63 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b return attached, err } +// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin. +func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create vSphere client + attached := make(map[string]bool) + for _, volPath := range volPaths { + attached[volPath] = false + } + err := vSphereLogin(vs, ctx) + if err != nil { + glog.Errorf("Failed to login into vCenter, err: %v", err) + return attached, err + } + + // Find VM to detach disk from + var vSphereInstance string + if nodeName == "" { + vSphereInstance = vs.localInstanceID + nodeName = vmNameToNodeName(vSphereInstance) + } else { + vSphereInstance = nodeNameToVMName(nodeName) + } + + nodeExist, err := vs.NodeExists(vs.client, nodeName) + + if err != nil { + glog.Errorf("Failed to check whether node exist. err: %s.", err) + return attached, err + } + + if !nodeExist { + glog.Warningf( + "Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.", + vSphereInstance, + volPaths) + return attached, nil + } + + // Get VM device list + _, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance) + if err != nil { + glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err) + return attached, err + } + + for _, volPath := range volPaths { + result, _ := checkDiskAttached(volPath, vmDevices, dc, vs.client) + if result { + attached[volPath] = true + } + } + + return attached, err +} + func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) { virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client) if err != nil { @@ -978,7 +1039,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o return false, err } for _, controllerType := range supportedSCSIControllerType { - controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client) + controllerkey, _ := getControllerKey(controllerType, vmdevices) if controllerkey == virtualDiskControllerKey { return true, nil } @@ -1010,7 +1071,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL // Returns key of the controller. // Key is unique id that distinguishes one device from other devices in the same virtual machine. -func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) { +func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) { for _, device := range vmDevices { devType := vmDevices.Type(device) if devType == scsiType { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 5a994cb8a38..133b69fb383 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -56,6 +56,10 @@ const ( // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute + + // reconcilerSyncDuration is the amount of time the reconciler sync states loop + // wait between successive executions + reconcilerSyncDuration time.Duration = 5 * time.Second ) // AttachDetachController defines the operations supported by this controller. @@ -122,6 +126,7 @@ func NewAttachDetachController( adc.reconciler = reconciler.NewReconciler( reconcilerLoopPeriod, reconcilerMaxWaitForUnmountDuration, + reconcilerSyncDuration, adc.desiredStateOfWorld, adc.actualStateOfWorld, adc.attacherDetacher, diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 2826125b024..3c203bbcd2f 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -106,6 +106,8 @@ type ActualStateOfWorld interface { // based on the current actual state of the world. GetAttachedVolumesForNode(nodeName types.NodeName) []AttachedVolume + GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume + // GetVolumesToReportAttached returns a map containing the set of nodes for // which the VolumesAttached Status field in the Node API object should be // updated. The key in this map is the name of the node to update and the @@ -541,6 +543,25 @@ func (asw *actualStateOfWorld) GetAttachedVolumesForNode( return attachedVolumes } +func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume { + asw.RLock() + defer asw.RUnlock() + + attachedVolumesPerNode := make(map[types.NodeName][]operationexecutor.AttachedVolume) + for _, volumeObj := range asw.attachedVolumes { + for nodeName, nodeObj := range volumeObj.nodesAttachedTo { + volumes, exists := attachedVolumesPerNode[nodeName] + if !exists { + volumes = []operationexecutor.AttachedVolume{} + } + volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume) + attachedVolumesPerNode[nodeName] = volumes + } + } + + return attachedVolumesPerNode +} + func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]api.AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 6e9816802c3..74c330cd75d 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -56,6 +56,7 @@ type Reconciler interface { func NewReconciler( loopPeriod time.Duration, maxWaitForUnmountDuration time.Duration, + syncDuration time.Duration, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, attacherDetacher operationexecutor.OperationExecutor, @@ -63,20 +64,24 @@ func NewReconciler( return &reconciler{ loopPeriod: loopPeriod, maxWaitForUnmountDuration: maxWaitForUnmountDuration, + syncDuration: syncDuration, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, attacherDetacher: attacherDetacher, nodeStatusUpdater: nodeStatusUpdater, + timeOfLastSync: time.Now(), } } type reconciler struct { loopPeriod time.Duration maxWaitForUnmountDuration time.Duration + syncDuration time.Duration desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld attacherDetacher operationexecutor.OperationExecutor nodeStatusUpdater statusupdater.NodeStatusUpdater + timeOfLastSync time.Time } func (rc *reconciler) Run(stopCh <-chan struct{}) { @@ -85,107 +90,135 @@ func (rc *reconciler) Run(stopCh <-chan struct{}) { func (rc *reconciler) reconciliationLoopFunc() func() { return func() { - // Detaches are triggered before attaches so that volumes referenced by - // pods that are rescheduled to a different node are detached first. - - // Ensure volumes that should be detached are detached. - for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { - if !rc.desiredStateOfWorld.VolumeExists( - attachedVolume.VolumeName, attachedVolume.NodeName) { - // Set the detach request time - elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) - if err != nil { - glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) - continue - } - // Check whether timeout has reached the maximum waiting time - timeout := elapsedTime > rc.maxWaitForUnmountDuration - // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout - if attachedVolume.MountedByNode && !timeout { - glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted", - attachedVolume.VolumeName, - attachedVolume.NodeName) - continue - } - - // Before triggering volume detach, mark volume as detached and update the node status - // If it fails to update node status, skip detach volume - rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) - - // Update Node Status to indicate volume is no longer safe to mount. - err = rc.nodeStatusUpdater.UpdateNodeStatuses() - if err != nil { - // Skip detaching this volume if unable to update node status - glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ", - attachedVolume.VolumeName, - attachedVolume.NodeName, - err) - continue - } - - // Trigger detach volume which requires verifing safe to detach step - // If timeout is true, skip verifySafeToDetach check - glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - verifySafeToDetach := !timeout - err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) - if err == nil { - if !timeout { - glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) - } else { - glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", - attachedVolume.VolumeName, - attachedVolume.NodeName, - rc.maxWaitForUnmountDuration) - } - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", - attachedVolume.VolumeName, - attachedVolume.VolumeSpec.Name(), - attachedVolume.NodeName, - err) - } - } - } - - // Ensure volumes that should be attached are attached. - for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { - if rc.actualStateOfWorld.VolumeNodeExists( - volumeToAttach.VolumeName, volumeToAttach.NodeName) { - // Volume/Node exists, touch it to reset detachRequestedTime - glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) - rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) - } else { - // Volume/Node doesn't exist, spawn a goroutine to attach it - glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) - if err == nil { - glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) - } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. - // Log all other errors. - glog.Errorf( - "operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v", - volumeToAttach.VolumeName, - volumeToAttach.VolumeSpec.Name(), - volumeToAttach.NodeName, - err) - } - } - } - - // Update Node Status - err := rc.nodeStatusUpdater.UpdateNodeStatuses() - if err != nil { - glog.Infof("UpdateNodeStatuses failed with: %v", err) + rc.reconcile() + // reconciler periodically checks whether the attached volumes from actual state + // are still attached to the node and udpate the status if they are not. + if time.Since(rc.timeOfLastSync) > rc.syncDuration { + rc.sync() } } } + +func (rc *reconciler) sync() { + defer rc.updateSyncTime() + rc.syncStates() +} + +func (rc *reconciler) updateSyncTime() { + rc.timeOfLastSync = time.Now() +} + +func (rc *reconciler) syncStates() { + volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode() + for nodeName, volumes := range volumesPerNode { + err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld) + if err != nil { + glog.Errorf("Error in syncing states for volumes: %v", err) + } + } +} + +func (rc *reconciler) reconcile() { + // Detaches are triggered before attaches so that volumes referenced by + // pods that are rescheduled to a different node are detached first. + + // Ensure volumes that should be detached are detached. + for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { + if !rc.desiredStateOfWorld.VolumeExists( + attachedVolume.VolumeName, attachedVolume.NodeName) { + // Set the detach request time + elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) + if err != nil { + glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) + continue + } + // Check whether timeout has reached the maximum waiting time + timeout := elapsedTime > rc.maxWaitForUnmountDuration + // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout + if attachedVolume.MountedByNode && !timeout { + glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted", + attachedVolume.VolumeName, + attachedVolume.NodeName) + continue + } + + // Before triggering volume detach, mark volume as detached and update the node status + // If it fails to update node status, skip detach volume + rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) + + // Update Node Status to indicate volume is no longer safe to mount. + err = rc.nodeStatusUpdater.UpdateNodeStatuses() + if err != nil { + // Skip detaching this volume if unable to update node status + glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ", + attachedVolume.VolumeName, + attachedVolume.NodeName, + err) + continue + } + + // Trigger detach volume which requires verifing safe to detach step + // If timeout is true, skip verifySafeToDetach check + glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) + verifySafeToDetach := !timeout + err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) + if err == nil { + if !timeout { + glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) + } else { + glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", + attachedVolume.VolumeName, + attachedVolume.NodeName, + rc.maxWaitForUnmountDuration) + } + } + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v", + attachedVolume.VolumeName, + attachedVolume.VolumeSpec.Name(), + attachedVolume.NodeName, + err) + } + } + } + + // Ensure volumes that should be attached are attached. + for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() { + if rc.actualStateOfWorld.VolumeNodeExists( + volumeToAttach.VolumeName, volumeToAttach.NodeName) { + // Volume/Node exists, touch it to reset detachRequestedTime + glog.V(1).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName) + rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) + } else { + // Volume/Node doesn't exist, spawn a goroutine to attach it + glog.V(1).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) + if err == nil { + glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf( + "operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v", + volumeToAttach.VolumeName, + volumeToAttach.VolumeSpec.Name(), + volumeToAttach.NodeName, + err) + } + } + } + + // Update Node Status + err := rc.nodeStatusUpdater.UpdateNodeStatuses() + if err != nil { + glog.Infof("UpdateNodeStatuses failed with: %v", err) + } +} diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index a23a34aa50d..20de8df3960 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -35,6 +35,7 @@ import ( const ( reconcilerLoopPeriod time.Duration = 0 * time.Millisecond + syncLoopPeriod time.Duration = 100 * time.Minute maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond resyncPeriod time.Duration = 5 * time.Minute ) @@ -55,7 +56,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { nsu := statusupdater.NewNodeStatusUpdater( fakeKubeClient, nodeInformer, asw) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) // Act ch := make(chan struct{}) @@ -83,7 +84,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -129,7 +130,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -196,7 +197,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -263,7 +264,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder) nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) podName := "pod-uid" volumeName := api.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index 328d45ed71f..59e2566a8f3 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -77,6 +77,40 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName return devicePath, nil } +func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.VolumeID) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumeID] = spec + } + attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if volumes (%v) is already attached to current node (%q). err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index 1615a371836..d3b8cf98567 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -317,6 +317,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) { return "", errors.New("Not implemented") } diff --git a/pkg/volume/azure_dd/attacher.go b/pkg/volume/azure_dd/attacher.go index dfb12b9f0c4..ed99cfa4bc6 100644 --- a/pkg/volume/azure_dd/attacher.go +++ b/pkg/volume/azure_dd/attacher.go @@ -115,6 +115,40 @@ func (attacher *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.Node return strconv.Itoa(int(lun)), err } +func (attacher *azureDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.DiskName) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.DiskName] = spec + } + attachedResult, err := attacher.azureProvider.DisksAreAttached(volumeIDList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if volumes (%v) are attached to current node (%q). err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + // WaitForAttach runs on the node to detect if the volume (referenced by LUN) is attached. If attached, the device path is returned func (attacher *azureDiskAttacher) WaitForAttach(spec *volume.Spec, lunStr string, timeout time.Duration) (string, error) { volumeSource, err := getVolumeSource(spec) diff --git a/pkg/volume/azure_dd/azure_dd.go b/pkg/volume/azure_dd/azure_dd.go index 2ae776fbd29..66fb31266a1 100644 --- a/pkg/volume/azure_dd/azure_dd.go +++ b/pkg/volume/azure_dd/azure_dd.go @@ -53,6 +53,8 @@ type azureCloudProvider interface { AttachDisk(diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error // Detaches the disk, identified by disk name or uri, from the host machine. DetachDiskByName(diskName, diskUri string, nodeName types.NodeName) error + // Check if a list of volumes are attached to the node with the specified NodeName + DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) // Get the LUN number of the disk that is attached to the host GetDiskLun(diskName, diskUri string, nodeName types.NodeName) (int32, error) // Get the next available LUN number to attach a new VHD diff --git a/pkg/volume/cinder/attacher.go b/pkg/volume/cinder/attacher.go index 38e2c3c8dd2..064ab26a60b 100644 --- a/pkg/volume/cinder/attacher.go +++ b/pkg/volume/cinder/attacher.go @@ -109,6 +109,40 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.Nod return devicePath, err } +func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumeIDList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumeIDList = append(volumeIDList, volumeSource.VolumeID) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumeID] = spec + } + attachedResult, err := attacher.cinderProvider.DisksAreAttached(volumeIDList, string(nodeName)) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v", + volumeIDList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumeID, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumeID] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/cinder/attacher_test.go b/pkg/volume/cinder/attacher_test.go index 2191c1181c2..ded3156ebee 100644 --- a/pkg/volume/cinder/attacher_test.go +++ b/pkg/volume/cinder/attacher_test.go @@ -417,6 +417,10 @@ func (testcase *testcase) Instances() (cloudprovider.Instances, bool) { return &instances{testcase.instanceID}, true } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + // Implementation of fake cloudprovider.Instances type instances struct { instanceID string diff --git a/pkg/volume/cinder/cinder.go b/pkg/volume/cinder/cinder.go index 3aef51fc63e..817c4db486e 100644 --- a/pkg/volume/cinder/cinder.go +++ b/pkg/volume/cinder/cinder.go @@ -50,6 +50,7 @@ type CinderProvider interface { InstanceID() (string, error) GetAttachmentDiskPath(instanceID string, diskName string) (string, error) DiskIsAttached(diskName, instanceID string) (bool, error) + DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) Instances() (cloudprovider.Instances, bool) } diff --git a/pkg/volume/gce_pd/attacher.go b/pkg/volume/gce_pd/attacher.go index c159ec01b88..190e27acaff 100644 --- a/pkg/volume/gce_pd/attacher.go +++ b/pkg/volume/gce_pd/attacher.go @@ -96,6 +96,40 @@ func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, nodeName ty return path.Join(diskByIdPath, diskGooglePrefix+pdName), nil } +func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumePdNameMap := make(map[string]*volume.Spec) + pdNameList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + // If error is occured, skip this volume and move to the next one + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + pdNameList = append(pdNameList, volumeSource.PDName) + volumesAttachedCheck[spec] = true + volumePdNameMap[volumeSource.PDName] = spec + } + attachedResult, err := attacher.gceDisks.DisksAreAttached(pdNameList, nodeName) + if err != nil { + // Log error and continue with attach + glog.Errorf( + "Error checking if PDs (%v) are already attached to current node (%q). err=%v", + pdNameList, nodeName, err) + return volumesAttachedCheck, err + } + + for pdName, attached := range attachedResult { + if !attached { + spec := volumePdNameMap[pdName] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", pdName, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { ticker := time.NewTicker(checkSleepDuration) defer ticker.Stop() diff --git a/pkg/volume/gce_pd/attacher_test.go b/pkg/volume/gce_pd/attacher_test.go index 6a80f93b2f2..2ac3c94250e 100644 --- a/pkg/volume/gce_pd/attacher_test.go +++ b/pkg/volume/gce_pd/attacher_test.go @@ -353,6 +353,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error { return errors.New("Not implemented") } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index f16e83da037..8ccc405a8d1 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -423,6 +423,12 @@ func (fv *FakeVolume) Detach(deviceMountPath string, nodeName types.NodeName) er return nil } +func (fv *FakeVolume) VolumesAreAttached(spec []*Spec, nodeName types.NodeName) (map[*Spec]bool, error) { + fv.Lock() + defer fv.Unlock() + return nil, nil +} + func (fv *FakeVolume) GetDetachCallCount() int { fv.RLock() defer fv.RUnlock() diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 266b6cdf054..c992f84b051 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -38,6 +38,9 @@ import ( const ( // emptyUniquePodName is a UniquePodName for empty string. emptyUniquePodName types.UniquePodName = types.UniquePodName("") + + // emptyUniqueVolumeName is a UniqueVolumeName for empty string + emptyUniqueVolumeName api.UniqueVolumeName = api.UniqueVolumeName("") ) // NestedPendingOperations defines the supported set of operations. @@ -151,10 +154,16 @@ func (grm *nestedPendingOperations) IsOperationPending( return false } +// This is an internal function and caller should acquire and release the lock func (grm *nestedPendingOperations) isOperationExists( volumeName api.UniqueVolumeName, podName types.UniquePodName) (bool, int) { + // If volumeName is empty, operation can be executed concurrently + if volumeName == emptyUniqueVolumeName { + return false, -1 + } + for previousOpIndex, previousOp := range grm.operations { if previousOp.volumeName != volumeName { // No match, keep searching diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 2e15b8d39a3..3651a353606 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -64,6 +64,13 @@ type OperationExecutor interface { // It then updates the actual state of the world to reflect that. AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + // VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node. + // If any volume is not attached right now, it will update the actual state of the world to reflect that. + // Note that this operation could be operated concurrently with other attach/detach operations. + // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached + // even if it is attached. But reconciler can correct this in a short period of time. + VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error + // DetachVolume detaches the volume from the node specified in // volumeToDetach, and updates the actual state of the world to reflect // that. If verifySafeToDetach is set, a call is made to the fetch the node @@ -397,6 +404,19 @@ func (oe *operationExecutor) DetachVolume( volumeToDetach.VolumeName, "" /* podName */, detachFunc) } +func (oe *operationExecutor) VerifyVolumesAreAttached( + attachedVolumes []AttachedVolume, + nodeName types.NodeName, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { + volumesAreAttachedFunc, err := + oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) + if err != nil { + return err + } + // Give an empty UniqueVolumeName so that this operation could be executed concurrently. + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc) +} + func (oe *operationExecutor) MountVolume( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, @@ -465,6 +485,83 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc) } +func (oe *operationExecutor) generateVolumesAreAttachedFunc( + attachedVolumes []AttachedVolume, + nodeName types.NodeName, + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + + // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong + // to this type of plugin + volumesPerPlugin := make(map[string][]*volume.Spec) + // volumeSpecMap maps from a volume spec to its unique volumeName which will be used + // when calling MarkVolumeAsDetached + volumeSpecMap := make(map[*volume.Spec]api.UniqueVolumeName) + // Iterate each volume spec and put them into a map index by the pluginName + for _, volumeAttached := range attachedVolumes { + volumePlugin, err := + oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) + if err != nil || volumePlugin == nil { + glog.Errorf( + "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", + volumeAttached.VolumeName, + volumeAttached.VolumeSpec.Name(), + volumeAttached.NodeName, + err) + } + volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] + if !pluginExists { + volumeSpecList = []*volume.Spec{} + } + volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) + volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList + volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName + } + + return func() error { + + // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check + // whether the volumes are still attached. + for pluginName, volumesSpecs := range volumesPerPlugin { + attachableVolumePlugin, err := + oe.volumePluginMgr.FindAttachablePluginByName(pluginName) + if err != nil || attachableVolumePlugin == nil { + glog.Errorf( + "VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v", + pluginName, + err) + continue + } + + volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() + if newAttacherErr != nil { + glog.Errorf( + "VolumesAreAttached failed for getting plugin %q with: %v", + pluginName, + newAttacherErr) + continue + } + + attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName) + if areAttachedErr != nil { + glog.Errorf( + "VolumesAreAttached failed for checking on node %q with: %v", + nodeName, + areAttachedErr) + continue + } + + for spec, check := range attached { + if !check { + actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName) + glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.", + volumeSpecMap[spec], spec.Name()) + } + } + } + return nil + }, nil +} + func (oe *operationExecutor) generateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 13ff21414fa..54081c54534 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -146,6 +146,11 @@ type Attacher interface { // node. Attach(spec *Spec, nodeName types.NodeName) (string, error) + // VolumesAreAttached checks whether the list of volumes still attached to the specified + // the node. It returns a map which maps from the volume spec to the checking result. + // If an error is occured during checking, the error will be returned + VolumesAreAttached(specs []*Spec, nodeName types.NodeName) (map[*Spec]bool, error) + // WaitForAttach blocks until the device is attached to this // node. If it successfully attaches, the path to the device // is returned. Otherwise, if the device does not attach after diff --git a/pkg/volume/vsphere_volume/attacher.go b/pkg/volume/vsphere_volume/attacher.go index 49811987532..d722b17eafc 100644 --- a/pkg/volume/vsphere_volume/attacher.go +++ b/pkg/volume/vsphere_volume/attacher.go @@ -84,6 +84,39 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil } +func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { + volumesAttachedCheck := make(map[*volume.Spec]bool) + volumeSpecMap := make(map[string]*volume.Spec) + volumePathList := []string{} + for _, spec := range specs { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) + continue + } + + volumePathList = append(volumePathList, volumeSource.VolumePath) + volumesAttachedCheck[spec] = true + volumeSpecMap[volumeSource.VolumePath] = spec + } + attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName) + if err != nil { + glog.Errorf( + "Error checking if volumes (%v) are attached to current node (%q). err=%v", + volumePathList, nodeName, err) + return volumesAttachedCheck, err + } + + for volumePath, attached := range attachedResult { + if !attached { + spec := volumeSpecMap[volumePath] + volumesAttachedCheck[spec] = false + glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumePath, spec.Name()) + } + } + return volumesAttachedCheck, nil +} + func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { volumeSource, _, err := getVolumeSource(spec) if err != nil { diff --git a/pkg/volume/vsphere_volume/attacher_test.go b/pkg/volume/vsphere_volume/attacher_test.go index ec2b657dafd..e029b36a2e4 100644 --- a/pkg/volume/vsphere_volume/attacher_test.go +++ b/pkg/volume/vsphere_volume/attacher_test.go @@ -308,6 +308,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam return expected.isAttached, expected.ret } +func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, errors.New("Not implemented") +} + func (testcase *testcase) CreateVolume(volumeOptions *vsphere.VolumeOptions) (volumePath string, err error) { return "", errors.New("Not implemented") } diff --git a/plugin/pkg/admission/persistentvolume/label/admission_test.go b/plugin/pkg/admission/persistentvolume/label/admission_test.go index f2a848955e2..edd33b34bf6 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/persistentvolume/label/admission_test.go @@ -62,6 +62,10 @@ func (c *mockVolumes) DiskIsAttached(volumeName string, nodeName types.NodeName) return false, fmt.Errorf("not implemented") } +func (c *mockVolumes) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { + return nil, fmt.Errorf("not implemented") +} + func mockVolumeFailure(err error) *mockVolumes { return &mockVolumes{volumeLabelsError: err} }