mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #44452 from gnufied/fix-aws-device-failure-reuse
Automatic merge from submit-queue Implement LRU for AWS device allocator On failure to attach do not use device from pool In AWS environment when attach fails on the node lets not use device from the pool. This makes sure that a bigger pool of devices is available.
This commit is contained in:
commit
36c5d12cf4
@ -1281,9 +1281,13 @@ func (c *Cloud) getMountDevice(
|
|||||||
// we want device names with two significant characters, starting with /dev/xvdbb
|
// we want device names with two significant characters, starting with /dev/xvdbb
|
||||||
// the allowed range is /dev/xvd[b-c][a-z]
|
// the allowed range is /dev/xvd[b-c][a-z]
|
||||||
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html
|
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html
|
||||||
deviceAllocator = NewDeviceAllocator(0)
|
deviceAllocator = NewDeviceAllocator()
|
||||||
c.deviceAllocators[i.nodeName] = deviceAllocator
|
c.deviceAllocators[i.nodeName] = deviceAllocator
|
||||||
}
|
}
|
||||||
|
// We need to lock deviceAllocator to prevent possible race with Deprioritize function
|
||||||
|
deviceAllocator.Lock()
|
||||||
|
defer deviceAllocator.Unlock()
|
||||||
|
|
||||||
chosen, err := deviceAllocator.GetNext(deviceMappings)
|
chosen, err := deviceAllocator.GetNext(deviceMappings)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Could not assign a mount device. mappings=%v, error: %v", deviceMappings, err)
|
glog.Warningf("Could not assign a mount device. mappings=%v, error: %v", deviceMappings, err)
|
||||||
@ -1544,7 +1548,9 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName,
|
|||||||
// TODO: Check if the volume was concurrently attached?
|
// TODO: Check if the volume was concurrently attached?
|
||||||
return "", fmt.Errorf("Error attaching EBS volume %q to instance %q: %v", disk.awsID, awsInstance.awsID, err)
|
return "", fmt.Errorf("Error attaching EBS volume %q to instance %q: %v", disk.awsID, awsInstance.awsID, err)
|
||||||
}
|
}
|
||||||
|
if da, ok := c.deviceAllocators[awsInstance.nodeName]; ok {
|
||||||
|
da.Deprioritize(mountDevice)
|
||||||
|
}
|
||||||
glog.V(2).Infof("AttachVolume volume=%q instance=%q request returned %v", disk.awsID, awsInstance.awsID, attachResponse)
|
glog.V(2).Infof("AttachVolume volume=%q instance=%q request returned %v", disk.awsID, awsInstance.awsID, attachResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1621,6 +1627,9 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
if da, ok := c.deviceAllocators[awsInstance.nodeName]; ok {
|
||||||
|
da.Deprioritize(mountDevice)
|
||||||
|
}
|
||||||
if attachment != nil {
|
if attachment != nil {
|
||||||
// We expect it to be nil, it is (maybe) interesting if it is not
|
// We expect it to be nil, it is (maybe) interesting if it is not
|
||||||
glog.V(2).Infof("waitForAttachmentStatus returned non-nil attachment with state=detached: %v", attachment)
|
glog.V(2).Infof("waitForAttachmentStatus returned non-nil attachment with state=detached: %v", attachment)
|
||||||
|
@ -16,7 +16,11 @@ limitations under the License.
|
|||||||
|
|
||||||
package aws
|
package aws
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
// ExistingDevices is a map of assigned devices. Presence of a key with a device
|
// ExistingDevices is a map of assigned devices. Presence of a key with a device
|
||||||
// name in the map means that the device is allocated. Value is irrelevant and
|
// name in the map means that the device is allocated. Value is irrelevant and
|
||||||
@ -40,48 +44,87 @@ type DeviceAllocator interface {
|
|||||||
// name. Only the device suffix is returned, e.g. "ba" for "/dev/xvdba".
|
// name. Only the device suffix is returned, e.g. "ba" for "/dev/xvdba".
|
||||||
// It's up to the called to add appropriate "/dev/sd" or "/dev/xvd" prefix.
|
// It's up to the called to add appropriate "/dev/sd" or "/dev/xvd" prefix.
|
||||||
GetNext(existingDevices ExistingDevices) (mountDevice, error)
|
GetNext(existingDevices ExistingDevices) (mountDevice, error)
|
||||||
|
|
||||||
|
// Deprioritize the device so as it can't be used immediately again
|
||||||
|
Deprioritize(mountDevice)
|
||||||
|
|
||||||
|
// Lock the deviceAllocator
|
||||||
|
Lock()
|
||||||
|
|
||||||
|
// Unlock the deviceAllocator
|
||||||
|
Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
type deviceAllocator struct {
|
type deviceAllocator struct {
|
||||||
possibleDevices []mountDevice
|
possibleDevices map[mountDevice]int
|
||||||
lastIndex int
|
counter int
|
||||||
|
deviceLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ DeviceAllocator = &deviceAllocator{}
|
||||||
|
|
||||||
|
type devicePair struct {
|
||||||
|
deviceName mountDevice
|
||||||
|
deviceIndex int
|
||||||
|
}
|
||||||
|
|
||||||
|
type devicePairList []devicePair
|
||||||
|
|
||||||
|
func (p devicePairList) Len() int { return len(p) }
|
||||||
|
func (p devicePairList) Less(i, j int) bool { return p[i].deviceIndex < p[j].deviceIndex }
|
||||||
|
func (p devicePairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||||
|
|
||||||
// Allocates device names according to scheme ba..bz, ca..cz
|
// Allocates device names according to scheme ba..bz, ca..cz
|
||||||
// it moves along the ring and always picks next device until
|
// it moves along the ring and always picks next device until
|
||||||
// device list is exhausted.
|
// device list is exhausted.
|
||||||
func NewDeviceAllocator(lastIndex int) DeviceAllocator {
|
func NewDeviceAllocator() DeviceAllocator {
|
||||||
possibleDevices := []mountDevice{}
|
possibleDevices := make(map[mountDevice]int)
|
||||||
for _, firstChar := range []rune{'b', 'c'} {
|
for _, firstChar := range []rune{'b', 'c'} {
|
||||||
for i := 'a'; i <= 'z'; i++ {
|
for i := 'a'; i <= 'z'; i++ {
|
||||||
dev := mountDevice([]rune{firstChar, i})
|
dev := mountDevice([]rune{firstChar, i})
|
||||||
possibleDevices = append(possibleDevices, dev)
|
possibleDevices[dev] = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &deviceAllocator{
|
return &deviceAllocator{
|
||||||
possibleDevices: possibleDevices,
|
possibleDevices: possibleDevices,
|
||||||
lastIndex: lastIndex,
|
counter: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNext gets next available device from the pool, this function assumes that caller
|
||||||
|
// holds the necessary lock on deviceAllocator
|
||||||
func (d *deviceAllocator) GetNext(existingDevices ExistingDevices) (mountDevice, error) {
|
func (d *deviceAllocator) GetNext(existingDevices ExistingDevices) (mountDevice, error) {
|
||||||
var candidate mountDevice
|
for _, devicePair := range d.sortByCount() {
|
||||||
foundIndex := d.lastIndex
|
if _, found := existingDevices[devicePair.deviceName]; !found {
|
||||||
for {
|
return devicePair.deviceName, nil
|
||||||
candidate, foundIndex = d.nextDevice(foundIndex + 1)
|
}
|
||||||
if _, found := existingDevices[candidate]; !found {
|
|
||||||
d.lastIndex = foundIndex
|
|
||||||
return candidate, nil
|
|
||||||
}
|
}
|
||||||
if foundIndex == d.lastIndex {
|
|
||||||
return "", fmt.Errorf("no devices are available")
|
return "", fmt.Errorf("no devices are available")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *deviceAllocator) sortByCount() devicePairList {
|
||||||
|
dpl := make(devicePairList, 0)
|
||||||
|
for deviceName, deviceIndex := range d.possibleDevices {
|
||||||
|
dpl = append(dpl, devicePair{deviceName, deviceIndex})
|
||||||
}
|
}
|
||||||
|
sort.Sort(dpl)
|
||||||
|
return dpl
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *deviceAllocator) nextDevice(nextIndex int) (mountDevice, int) {
|
func (d *deviceAllocator) Lock() {
|
||||||
if nextIndex < len(d.possibleDevices) {
|
d.deviceLock.Lock()
|
||||||
return d.possibleDevices[nextIndex], nextIndex
|
}
|
||||||
|
|
||||||
|
func (d *deviceAllocator) Unlock() {
|
||||||
|
d.deviceLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprioritize the device so as it can't be used immediately again
|
||||||
|
func (d *deviceAllocator) Deprioritize(chosen mountDevice) {
|
||||||
|
d.deviceLock.Lock()
|
||||||
|
defer d.deviceLock.Unlock()
|
||||||
|
if _, ok := d.possibleDevices[chosen]; ok {
|
||||||
|
d.counter++
|
||||||
|
d.possibleDevices[chosen] = d.counter
|
||||||
}
|
}
|
||||||
return d.possibleDevices[0], 0
|
|
||||||
}
|
}
|
||||||
|
@ -22,37 +22,22 @@ func TestDeviceAllocator(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
existingDevices ExistingDevices
|
existingDevices ExistingDevices
|
||||||
lastIndex int
|
deviceMap map[mountDevice]int
|
||||||
expectedOutput mountDevice
|
expectedOutput mountDevice
|
||||||
}{
|
}{
|
||||||
{
|
|
||||||
"empty device list",
|
|
||||||
ExistingDevices{},
|
|
||||||
0,
|
|
||||||
"bb",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"empty device list with wrap",
|
"empty device list with wrap",
|
||||||
ExistingDevices{},
|
ExistingDevices{},
|
||||||
51,
|
generateUnsortedDeviceList(),
|
||||||
"ba", // next to 'zz' is the first one, 'ba'
|
"bd", // next to 'zz' is the first one, 'ba'
|
||||||
},
|
|
||||||
{
|
|
||||||
"device list",
|
|
||||||
ExistingDevices{"ba": "used", "bb": "used", "bc": "used"},
|
|
||||||
0,
|
|
||||||
"bd",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"device list with wrap",
|
|
||||||
ExistingDevices{"cy": "used", "cz": "used", "ba": "used"},
|
|
||||||
49,
|
|
||||||
"bb", // "cy", "cz" and "ba" are used
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
allocator := NewDeviceAllocator(test.lastIndex).(*deviceAllocator)
|
allocator := NewDeviceAllocator().(*deviceAllocator)
|
||||||
|
for k, v := range test.deviceMap {
|
||||||
|
allocator.possibleDevices[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
got, err := allocator.GetNext(test.existingDevices)
|
got, err := allocator.GetNext(test.existingDevices)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -64,8 +49,20 @@ func TestDeviceAllocator(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func generateUnsortedDeviceList() map[mountDevice]int {
|
||||||
|
possibleDevices := make(map[mountDevice]int)
|
||||||
|
for _, firstChar := range []rune{'b', 'c'} {
|
||||||
|
for i := 'a'; i <= 'z'; i++ {
|
||||||
|
dev := mountDevice([]rune{firstChar, i})
|
||||||
|
possibleDevices[dev] = 3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
possibleDevices["bd"] = 0
|
||||||
|
return possibleDevices
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeviceAllocatorError(t *testing.T) {
|
func TestDeviceAllocatorError(t *testing.T) {
|
||||||
allocator := NewDeviceAllocator(0).(*deviceAllocator)
|
allocator := NewDeviceAllocator().(*deviceAllocator)
|
||||||
existingDevices := ExistingDevices{}
|
existingDevices := ExistingDevices{}
|
||||||
|
|
||||||
// make all devices used
|
// make all devices used
|
||||||
|
Loading…
Reference in New Issue
Block a user