Add sync state loop in master's volume reconciler

At master volume reconciler, the information about which volumes are
attached to nodes is cached in actual state of world. However, this
information might be out of date in case that node is terminated (volume
is detached automatically). In this situation, reconciler assume volume
is still attached and will not issue attach operation when node comes
back. Pods created on those nodes will fail to mount.

This PR adds the logic to periodically sync up the truth for attached volumes kept in the actual state cache. If the volume is no longer attached to the node, the actual state will be updated to reflect the truth. In turn, reconciler will take actions if needed.

To avoid issuing many concurrent operations on cloud provider, this PR
tries to add batch operation to check whether a list of volumes are
attached to the node instead of one request per volume.

More details are explained in PR #33760
This commit is contained in:
Jing Xu
2016-10-14 14:21:58 -07:00
parent dbcf413334
commit abbde43374
26 changed files with 679 additions and 108 deletions

View File

@@ -321,6 +321,9 @@ type Volumes interface {
// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
@@ -1774,6 +1777,41 @@ func (c *Cloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool,
return false, nil
}
func (c *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
info, err := awsInstance.describeInstance()
if err != nil {
return attached, err
}
for _, blockDevice := range info.BlockDeviceMappings {
volumeId := aws.StringValue(blockDevice.Ebs.VolumeId)
for _, diskName := range diskNames {
if volumeId == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}

View File

@@ -75,6 +75,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI string, nodeName types.NodeName, l
return err
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (az *Cloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
vm, exists, err := az.getVirtualMachine(nodeName)
if !exists {
// if host doesn't exist, no need to detach
glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",
nodeName, diskNames)
return attached, nil
} else if err != nil {
return attached, err
}
disks := *vm.Properties.StorageProfile.DataDisks
for _, disk := range disks {
for _, diskName := range diskNames {
if disk.Name != nil && diskName != "" && *disk.Name == diskName {
attached[diskName] = true
}
}
}
return attached, nil
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (az *Cloud) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {

View File

@@ -133,6 +133,10 @@ type Disks interface {
// DiskIsAttached checks if a disk is attached to the node with the specified NodeName.
DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error)
// DisksAreAttached is a batch function to check if a list of disks are attached
// to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error)
// CreateDisk creates a new PD with given properties. Tags are serialized
// as JSON into Description field.
CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error
@@ -2651,6 +2655,38 @@ func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (b
return false, nil
}
func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
instanceName := mapNodeNameToInstanceName(nodeName)
instance, err := gce.getInstanceByName(instanceName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.",
instanceName,
diskNames)
return attached, nil
}
return attached, err
}
for _, instanceDisk := range instance.Disks {
for _, diskName := range diskNames {
if instanceDisk.DeviceName == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Returns a gceDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {

View File

@@ -240,6 +240,24 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, nil
}
// query if a list of volumes are attached to a compute instance
func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
for _, diskName := range diskNames {
disk, err := os.getVolume(diskName)
if err != nil {
continue
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
attached[diskName] = true
}
}
return attached, nil
}
// diskIsUsed returns true a disk is attached to any node.
func (os *OpenStack) diskIsUsed(diskName string) (bool, error) {
disk, err := os.getVolume(diskName)

View File

@@ -660,3 +660,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list volumes are attached to a compute instance
func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
var returnedErr error
for _, diskName := range diskNames {
result, err := rs.DiskIsAttached(diskName, instanceID)
if err != nil {
returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr)
continue
}
if result {
attached[diskName] = true
}
}
return attached, returnedErr
}

View File

@@ -150,6 +150,10 @@ type Volumes interface {
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (bool, error)
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(volumeOptions *VolumeOptions) (volumePath string, err error)
@@ -968,6 +972,63 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b
return attached, err
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create vSphere client
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
err := vSphereLogin(vs, ctx)
if err != nil {
glog.Errorf("Failed to login into vCenter, err: %v", err)
return attached, err
}
// Find VM to detach disk from
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
nodeName = vmNameToNodeName(vSphereInstance)
} else {
vSphereInstance = nodeNameToVMName(nodeName)
}
nodeExist, err := vs.NodeExists(vs.client, nodeName)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return attached, err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.",
vSphereInstance,
volPaths)
return attached, nil
}
// Get VM device list
_, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance)
if err != nil {
glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err)
return attached, err
}
for _, volPath := range volPaths {
result, _ := checkDiskAttached(volPath, vmDevices, dc, vs.client)
if result {
attached[volPath] = true
}
}
return attached, err
}
func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) {
virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client)
if err != nil {
@@ -978,7 +1039,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o
return false, err
}
for _, controllerType := range supportedSCSIControllerType {
controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client)
controllerkey, _ := getControllerKey(controllerType, vmdevices)
if controllerkey == virtualDiskControllerKey {
return true, nil
}
@@ -1010,7 +1071,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL
// Returns key of the controller.
// Key is unique id that distinguishes one device from other devices in the same virtual machine.
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) {
for _, device := range vmDevices {
devType := vmDevices.Type(device)
if devType == scsiType {