Merge pull request #34859 from jingxu97/syncAttach-10-15

Automatic merge from submit-queue

Add sync state loop in master's volume reconciler

At master volume reconciler, the information about which volumes are
attached to nodes is cached in actual state of world. However, this
information might be out of date in case that node is terminated (volume
is detached automatically). In this situation, reconciler assume volume
is still attached and will not issue attach operation when node comes
back. Pods created on those nodes will fail to mount.
This PR adds the logic to periodically sync up the truth for attached
volumes kept in
the actual state cache. If the volume is no longer attached to the node,
the actual state will be updated to reflect the truth. In turn,
reconciler will take actions if needed.
To avoid issuing many concurrent operations on cloud provider, this PR
tries to add batch operation to check whether a list of volumes are
attached to the node instead of one request per volume.
This commit is contained in:
Kubernetes Submit Queue 2016-10-28 18:33:29 -07:00 committed by GitHub
commit 3e7172d49e
26 changed files with 679 additions and 108 deletions

View File

@ -321,6 +321,9 @@ type Volumes interface {
// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@ -1774,6 +1777,41 @@ func (c *Cloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool,
return false, nil
}
func (c *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
info, err := awsInstance.describeInstance()
if err != nil {
return attached, err
}
for _, blockDevice := range info.BlockDeviceMappings {
volumeId := aws.StringValue(blockDevice.Ebs.VolumeId)
for _, diskName := range diskNames {
if volumeId == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}

View File

@ -75,6 +75,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l
return err
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (az *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
vm, exists, err := az.getVirtualMachine(nodeName)
if !exists {
// if host doesn't exist, no need to detach
glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",
nodeName, diskNames)
return attached, nil
} else if err != nil {
return attached, err
}
disks := *vm.Properties.StorageProfile.DataDisks
for _, disk := range disks {
for _, diskName := range diskNames {
if disk.Name != nil && diskName != "" && *disk.Name == diskName {
attached[diskName] = true
}
}
}
return attached, nil
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {

View File

@ -133,6 +133,10 @@ type Disks interface {
// DiskIsAttached checks if a disk is attached to the node with the specified NodeName.
DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error)
// DisksAreAttached is a batch function to check if a list of disks are attached
// to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
// CreateDisk creates a new PD with given properties. Tags are serialized
// as JSON into Description field.
CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error
@ -2651,6 +2655,38 @@ func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (b
return false, nil
}
func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
instanceName := mapNodeNameToInstanceName(nodeName)
instance, err := gce.getInstanceByName(instanceName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.",
instanceName,
diskNames)
return attached, nil
}
return attached, err
}
for _, instanceDisk := range instance.Disks {
for _, diskName := range diskNames {
if instanceDisk.DeviceName == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Returns a gceDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {

View File

@ -240,6 +240,24 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, nil
}
// query if a list of volumes are attached to a compute instance
func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
for _, diskName := range diskNames {
disk, err := os.getVolume(diskName)
if err != nil {
continue
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
attached[diskName] = true
}
}
return attached, nil
}
// diskIsUsed returns true a disk is attached to any node.
func (os *OpenStack) diskIsUsed(diskName string) (bool, error) {
disk, err := os.getVolume(diskName)

View File

@ -660,3 +660,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list volumes are attached to a compute instance
func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
var returnedErr error
for _, diskName := range diskNames {
result, err := rs.DiskIsAttached(diskName, instanceID)
if err != nil {
returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr)
continue
}
if result {
attached[diskName] = true
}
}
return attached, returnedErr
}

View File

@ -150,6 +150,10 @@ type Volumes interface {
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error)
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(volumeOptions *VolumeOptions) (volumePath string, err error)
@ -968,6 +972,63 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b
return attached, err
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create vSphere client
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
err := vSphereLogin(vs, ctx)
if err != nil {
glog.Errorf("Failed to login into vCenter, err: %v", err)
return attached, err
}
// Find VM to detach disk from
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
nodeName = vmNameToNodeName(vSphereInstance)
} else {
vSphereInstance = nodeNameToVMName(nodeName)
}
nodeExist, err := vs.NodeExists(vs.client, nodeName)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return attached, err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.",
vSphereInstance,
volPaths)
return attached, nil
}
// Get VM device list
_, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance)
if err != nil {
glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err)
return attached, err
}
for _, volPath := range volPaths {
result, _ := checkDiskAttached(volPath, vmDevices, dc, vs.client)
if result {
attached[volPath] = true
}
}
return attached, err
}
func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) {
virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client)
if err != nil {
@ -978,7 +1039,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o
return false, err
}
for _, controllerType := range supportedSCSIControllerType {
controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client)
controllerkey, _ := getControllerKey(controllerType, vmdevices)
if controllerkey == virtualDiskControllerKey {
return true, nil
}
@ -1010,7 +1071,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL
// Returns key of the controller.
// Key is unique id that distinguishes one device from other devices in the same virtual machine.
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) {
for _, device := range vmDevices {
devType := vmDevices.Type(device)
if devType == scsiType {

View File

@ -56,6 +56,10 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
// reconcilerSyncDuration is the amount of time the reconciler sync states loop
// wait between successive executions
reconcilerSyncDuration time.Duration = 5 * time.Second
)
// AttachDetachController defines the operations supported by this controller.
@ -122,6 +126,7 @@ func NewAttachDetachController(
adc.reconciler = reconciler.NewReconciler(
reconcilerLoopPeriod,
reconcilerMaxWaitForUnmountDuration,
reconcilerSyncDuration,
adc.desiredStateOfWorld,
adc.actualStateOfWorld,
adc.attacherDetacher,

View File

@ -106,6 +106,8 @@ type ActualStateOfWorld interface {
// based on the current actual state of the world.
GetAttachedVolumesForNode(nodeName types.NodeName) []AttachedVolume
GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume
// GetVolumesToReportAttached returns a map containing the set of nodes for
// which the VolumesAttached Status field in the Node API object should be
// updated. The key in this map is the name of the node to update and the
@ -541,6 +543,25 @@ func (asw *actualStateOfWorld) GetAttachedVolumesForNode(
return attachedVolumes
}
func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume {
asw.RLock()
defer asw.RUnlock()
attachedVolumesPerNode := make(map[types.NodeName][]operationexecutor.AttachedVolume)
for _, volumeObj := range asw.attachedVolumes {
for nodeName, nodeObj := range volumeObj.nodesAttachedTo {
volumes, exists := attachedVolumesPerNode[nodeName]
if !exists {
volumes = []operationexecutor.AttachedVolume{}
}
volumes = append(volumes, getAttachedVolume(&volumeObj, &nodeObj).AttachedVolume)
attachedVolumesPerNode[nodeName] = volumes
}
}
return attachedVolumesPerNode
}
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]api.AttachedVolume {
asw.RLock()
defer asw.RUnlock()

View File

@ -56,6 +56,7 @@ type Reconciler interface {
func NewReconciler(
loopPeriod time.Duration,
maxWaitForUnmountDuration time.Duration,
syncDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher operationexecutor.OperationExecutor,
@ -63,20 +64,24 @@ func NewReconciler(
return &reconciler{
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
syncDuration: syncDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
timeOfLastSync: time.Now(),
}
}
type reconciler struct {
loopPeriod time.Duration
maxWaitForUnmountDuration time.Duration
syncDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
timeOfLastSync time.Time
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
@ -85,107 +90,135 @@ func (rc *reconciler) Run(stopCh <-chan struct{}) {
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.NodeName,
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(5).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
err)
}
}
}
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
glog.Infof("UpdateNodeStatuses failed with: %v", err)
rc.reconcile()
// reconciler periodically checks whether the attached volumes from actual state
// are still attached to the node and udpate the status if they are not.
if time.Since(rc.timeOfLastSync) > rc.syncDuration {
rc.sync()
}
}
}
func (rc *reconciler) sync() {
defer rc.updateSyncTime()
rc.syncStates()
}
func (rc *reconciler) updateSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) syncStates() {
volumesPerNode := rc.actualStateOfWorld.GetAttachedVolumesPerNode()
for nodeName, volumes := range volumesPerNode {
err := rc.attacherDetacher.VerifyVolumesAreAttached(volumes, nodeName, rc.actualStateOfWorld)
if err != nil {
glog.Errorf("Error in syncing states for volumes: %v", err)
}
}
}
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err)
continue
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
glog.V(12).Infof("Cannot trigger detach for volume %q on node %q because volume is still mounted",
attachedVolume.VolumeName,
attachedVolume.NodeName)
continue
}
// Before triggering volume detach, mark volume as detached and update the node status
// If it fails to update node status, skip detach volume
rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName)
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Errorf("UpdateNodeStatuses failed while attempting to report volume %q as attached to node %q with: %v ",
attachedVolume.VolumeName,
attachedVolume.NodeName,
err)
continue
}
// Trigger detach volume which requires verifing safe to detach step
// If timeout is true, skip verifySafeToDetach check
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
verifySafeToDetach := !timeout
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
glog.Infof("Started DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
} else {
glog.Infof("Started DetachVolume for volume %q from node %q. This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching",
attachedVolume.VolumeName,
attachedVolume.NodeName,
rc.maxWaitForUnmountDuration)
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.DetachVolume failed to start for volume %q (spec.Name: %q) from node %q with err: %v",
attachedVolume.VolumeName,
attachedVolume.VolumeSpec.Name(),
attachedVolume.NodeName,
err)
}
}
}
// Ensure volumes that should be attached are attached.
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
if rc.actualStateOfWorld.VolumeNodeExists(
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
glog.V(1).Infof("Volume %q/Node %q is attached--touching.", volumeToAttach.VolumeName, volumeToAttach.NodeName)
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(1).Infof("Attempting to start AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof("Started AttachVolume for volume %q to node %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(
"operationExecutor.AttachVolume failed to start for volume %q (spec.Name: %q) to node %q with err: %v",
volumeToAttach.VolumeName,
volumeToAttach.VolumeSpec.Name(),
volumeToAttach.NodeName,
err)
}
}
}
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
glog.Infof("UpdateNodeStatuses failed with: %v", err)
}
}

View File

@ -35,6 +35,7 @@ import (
const (
reconcilerLoopPeriod time.Duration = 0 * time.Millisecond
syncLoopPeriod time.Duration = 100 * time.Minute
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
resyncPeriod time.Duration = 5 * time.Minute
)
@ -55,7 +56,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
// Act
ch := make(chan struct{})
@ -83,7 +84,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@ -129,7 +130,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@ -196,7 +197,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@ -263,7 +264,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder)
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)

View File

@ -77,6 +77,40 @@ func (attacher *awsElasticBlockStoreAttacher) Attach(spec *volume.Spec, nodeName
return devicePath, nil
}
func (attacher *awsElasticBlockStoreAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeSource.VolumeID)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.VolumeID] = spec
}
attachedResult, err := attacher.awsVolumes.DisksAreAttached(volumeIDList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volumes (%v) is already attached to current node (%q). err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *awsElasticBlockStoreAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -317,6 +317,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
return nil, errors.New("Not implemented")
}
func (testcase *testcase) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) {
return "", errors.New("Not implemented")
}

View File

@ -115,6 +115,40 @@ func (attacher *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.Node
return strconv.Itoa(int(lun)), err
}
func (attacher *azureDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeSource, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeSource.DiskName)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.DiskName] = spec
}
attachedResult, err := attacher.azureProvider.DisksAreAttached(volumeIDList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
// WaitForAttach runs on the node to detect if the volume (referenced by LUN) is attached. If attached, the device path is returned
func (attacher *azureDiskAttacher) WaitForAttach(spec *volume.Spec, lunStr string, timeout time.Duration) (string, error) {
volumeSource, err := getVolumeSource(spec)

View File

@ -53,6 +53,8 @@ type azureCloudProvider interface {
AttachDisk(diskName, diskUri string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error
// Detaches the disk, identified by disk name or uri, from the host machine.
DetachDiskByName(diskName, diskUri string, nodeName types.NodeName) error
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
// Get the LUN number of the disk that is attached to the host
GetDiskLun(diskName, diskUri string, nodeName types.NodeName) (int32, error)
// Get the next available LUN number to attach a new VHD

View File

@ -109,6 +109,40 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.Nod
return devicePath, err
}
func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumeIDList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumeIDList = append(volumeIDList, volumeSource.VolumeID)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.VolumeID] = spec
}
attachedResult, err := attacher.cinderProvider.DisksAreAttached(volumeIDList, string(nodeName))
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if Volumes (%v) are already attached to current node (%q). Will continue and try attach anyway. err=%v",
volumeIDList, nodeName, err)
return volumesAttachedCheck, err
}
for volumeID, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumeID]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumeID, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *cinderDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -417,6 +417,10 @@ func (testcase *testcase) Instances() (cloudprovider.Instances, bool) {
return &instances{testcase.instanceID}, true
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) {
return nil, errors.New("Not implemented")
}
// Implementation of fake cloudprovider.Instances
type instances struct {
instanceID string

View File

@ -50,6 +50,7 @@ type CinderProvider interface {
InstanceID() (string, error)
GetAttachmentDiskPath(instanceID string, diskName string) (string, error)
DiskIsAttached(diskName, instanceID string) (bool, error)
DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error)
Instances() (cloudprovider.Instances, bool)
}

View File

@ -96,6 +96,40 @@ func (attacher *gcePersistentDiskAttacher) Attach(spec *volume.Spec, nodeName ty
return path.Join(diskByIdPath, diskGooglePrefix+pdName), nil
}
func (attacher *gcePersistentDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumePdNameMap := make(map[string]*volume.Spec)
pdNameList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
// If error is occured, skip this volume and move to the next one
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
pdNameList = append(pdNameList, volumeSource.PDName)
volumesAttachedCheck[spec] = true
volumePdNameMap[volumeSource.PDName] = spec
}
attachedResult, err := attacher.gceDisks.DisksAreAttached(pdNameList, nodeName)
if err != nil {
// Log error and continue with attach
glog.Errorf(
"Error checking if PDs (%v) are already attached to current node (%q). err=%v",
pdNameList, nodeName, err)
return volumesAttachedCheck, err
}
for pdName, attached := range attachedResult {
if !attached {
spec := volumePdNameMap[pdName]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", pdName, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *gcePersistentDiskAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()

View File

@ -353,6 +353,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
return nil, errors.New("Not implemented")
}
func (testcase *testcase) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
return errors.New("Not implemented")
}

View File

@ -423,6 +423,12 @@ func (fv *FakeVolume) Detach(deviceMountPath string, nodeName types.NodeName) er
return nil
}
func (fv *FakeVolume) VolumesAreAttached(spec []*Spec, nodeName types.NodeName) (map[*Spec]bool, error) {
fv.Lock()
defer fv.Unlock()
return nil, nil
}
func (fv *FakeVolume) GetDetachCallCount() int {
fv.RLock()
defer fv.RUnlock()

View File

@ -38,6 +38,9 @@ import (
const (
// emptyUniquePodName is a UniquePodName for empty string.
emptyUniquePodName types.UniquePodName = types.UniquePodName("")
// emptyUniqueVolumeName is a UniqueVolumeName for empty string
emptyUniqueVolumeName api.UniqueVolumeName = api.UniqueVolumeName("")
)
// NestedPendingOperations defines the supported set of operations.
@ -151,10 +154,16 @@ func (grm *nestedPendingOperations) IsOperationPending(
return false
}
// This is an internal function and caller should acquire and release the lock
func (grm *nestedPendingOperations) isOperationExists(
volumeName api.UniqueVolumeName,
podName types.UniquePodName) (bool, int) {
// If volumeName is empty, operation can be executed concurrently
if volumeName == emptyUniqueVolumeName {
return false, -1
}
for previousOpIndex, previousOp := range grm.operations {
if previousOp.volumeName != volumeName {
// No match, keep searching

View File

@ -64,6 +64,13 @@ type OperationExecutor interface {
// It then updates the actual state of the world to reflect that.
AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// VerifyVolumesAreAttached verifies the given list of volumes to see whether they are still attached to the node.
// If any volume is not attached right now, it will update the actual state of the world to reflect that.
// Note that this operation could be operated concurrently with other attach/detach operations.
// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
// even if it is attached. But reconciler can correct this in a short period of time.
VerifyVolumesAreAttached(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
// DetachVolume detaches the volume from the node specified in
// volumeToDetach, and updates the actual state of the world to reflect
// that. If verifySafeToDetach is set, a call is made to the fetch the node
@ -397,6 +404,19 @@ func (oe *operationExecutor) DetachVolume(
volumeToDetach.VolumeName, "" /* podName */, detachFunc)
}
func (oe *operationExecutor) VerifyVolumesAreAttached(
attachedVolumes []AttachedVolume,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
volumesAreAttachedFunc, err :=
oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
if err != nil {
return err
}
// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc)
}
func (oe *operationExecutor) MountVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
@ -465,6 +485,83 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume(
volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc)
}
func (oe *operationExecutor) generateVolumesAreAttachedFunc(
attachedVolumes []AttachedVolume,
nodeName types.NodeName,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {
// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
// to this type of plugin
volumesPerPlugin := make(map[string][]*volume.Spec)
// volumeSpecMap maps from a volume spec to its unique volumeName which will be used
// when calling MarkVolumeAsDetached
volumeSpecMap := make(map[*volume.Spec]api.UniqueVolumeName)
// Iterate each volume spec and put them into a map index by the pluginName
for _, volumeAttached := range attachedVolumes {
volumePlugin, err :=
oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
if err != nil || volumePlugin == nil {
glog.Errorf(
"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
volumeAttached.VolumeName,
volumeAttached.VolumeSpec.Name(),
volumeAttached.NodeName,
err)
}
volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
if !pluginExists {
volumeSpecList = []*volume.Spec{}
}
volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
}
return func() error {
// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
// whether the volumes are still attached.
for pluginName, volumesSpecs := range volumesPerPlugin {
attachableVolumePlugin, err :=
oe.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
glog.Errorf(
"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
pluginName,
err)
continue
}
volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
if newAttacherErr != nil {
glog.Errorf(
"VolumesAreAttached failed for getting plugin %q with: %v",
pluginName,
newAttacherErr)
continue
}
attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
if areAttachedErr != nil {
glog.Errorf(
"VolumesAreAttached failed for checking on node %q with: %v",
nodeName,
areAttachedErr)
continue
}
for spec, check := range attached {
if !check {
actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
volumeSpecMap[spec], spec.Name())
}
}
}
return nil
}, nil
}
func (oe *operationExecutor) generateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) {

View File

@ -146,6 +146,11 @@ type Attacher interface {
// node.
Attach(spec *Spec, nodeName types.NodeName) (string, error)
// VolumesAreAttached checks whether the list of volumes still attached to the specified
// the node. It returns a map which maps from the volume spec to the checking result.
// If an error is occured during checking, the error will be returned
VolumesAreAttached(specs []*Spec, nodeName types.NodeName) (map[*Spec]bool, error)
// WaitForAttach blocks until the device is attached to this
// node. If it successfully attaches, the path to the device
// is returned. Otherwise, if the device does not attach after

View File

@ -84,6 +84,39 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No
return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil
}
func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool)
volumeSpecMap := make(map[string]*volume.Spec)
volumePathList := []string{}
for _, spec := range specs {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err)
continue
}
volumePathList = append(volumePathList, volumeSource.VolumePath)
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.VolumePath] = spec
}
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName)
if err != nil {
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
volumePathList, nodeName, err)
return volumesAttachedCheck, err
}
for volumePath, attached := range attachedResult {
if !attached {
spec := volumeSpecMap[volumePath]
volumesAttachedCheck[spec] = false
glog.V(2).Infof("VolumesAreAttached: check volume %q (specName: %q) is no longer attached", volumePath, spec.Name())
}
}
return volumesAttachedCheck, nil
}
func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) {
volumeSource, _, err := getVolumeSource(spec)
if err != nil {

View File

@ -308,6 +308,10 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
return nil, errors.New("Not implemented")
}
func (testcase *testcase) CreateVolume(volumeOptions *vsphere.VolumeOptions) (volumePath string, err error) {
return "", errors.New("Not implemented")
}

View File

@ -62,6 +62,10 @@ func (c *mockVolumes) DiskIsAttached(volumeName string, nodeName types.NodeName)
return false, fmt.Errorf("not implemented")
}
func (c *mockVolumes) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
return nil, fmt.Errorf("not implemented")
}
func mockVolumeFailure(err error) *mockVolumes {
return &mockVolumes{volumeLabelsError: err}
}