Merge pull request #85115 from aramase/azure-disk-lock

azure: remove disk locks per vm during attach/detach
This commit is contained in:
Kubernetes Prow Robot 2019-11-13 23:13:48 -08:00 committed by GitHub
commit 5dd641e45c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 174 additions and 13 deletions

View File

@ -29,6 +29,7 @@ go_library(
"azure_standard.go",
"azure_storage.go",
"azure_storageaccount.go",
"azure_utils.go",
"azure_vmsets.go",
"azure_vmss.go",
"azure_vmss_cache.go",
@ -76,7 +77,6 @@ go_library(
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
@ -98,6 +98,7 @@ go_test(
"azure_storage_test.go",
"azure_storageaccount_test.go",
"azure_test.go",
"azure_utils_test.go",
"azure_vmss_cache_test.go",
"azure_vmss_test.go",
"azure_wrap_test.go",

View File

@ -614,6 +614,7 @@ func initDiskControllers(az *Cloud) error {
resourceGroup: az.ResourceGroup,
subscriptionID: az.SubscriptionID,
cloud: az,
vmLockMap: newLockMap(),
}
az.BlobDiskController = &BlobDiskController{common: common}

View File

@ -33,7 +33,6 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog"
"k8s.io/utils/keymutex"
)
const (
@ -58,9 +57,6 @@ var defaultBackOff = kwait.Backoff{
Jitter: 0.0,
}
// acquire lock to attach/detach disk in one node
var diskOpMutex = keymutex.NewHashed(0)
type controllerCommon struct {
subscriptionID string
location string
@ -68,7 +64,9 @@ type controllerCommon struct {
resourceGroup string
// store disk URI when disk is in attaching or detaching process
diskAttachDetachMap sync.Map
cloud *Cloud
// vm disk map used to lock per vm update calls
vmLockMap *lockMap
cloud *Cloud
}
// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
@ -144,8 +142,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskOpMutex.LockKey(instanceid)
defer diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.LockEntry(strings.ToLower(string(nodeName)))
defer c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName)))
lun, err := c.GetNextDiskLun(nodeName)
if err != nil {
@ -161,7 +159,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// DetachDisk detaches a disk from host. The vhd can be identified by diskName or diskURI.
func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
_, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach
@ -181,20 +179,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
// make the lock here as small as possible
diskOpMutex.LockKey(instanceid)
c.vmLockMap.LockEntry(strings.ToLower(string(nodeName)))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName)))
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
retryErr := kwait.ExponentialBackoff(c.cloud.RequestBackoff(), func() (bool, error) {
diskOpMutex.LockKey(instanceid)
c.vmLockMap.LockEntry(strings.ToLower(string(nodeName)))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName)))
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
})
if retryErr != nil {

View File

@ -68,6 +68,7 @@ func TestCommonAttachDisk(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
testCloud.SubscriptionID, testCloud.ResourceGroup)
@ -116,6 +117,7 @@ func TestCommonDetachDisk(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
testCloud.SubscriptionID, testCloud.ResourceGroup)
@ -156,6 +158,7 @@ func TestGetDiskLun(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
@ -194,6 +197,7 @@ func TestGetNextDiskLun(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, test.isDataDisksFull)
@ -235,6 +239,7 @@ func TestDisksAreAttached(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)

View File

@ -0,0 +1,71 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"sync"
)
// lockMap used to lock on entries
type lockMap struct {
sync.Mutex
mutexMap map[string]*sync.Mutex
}
// NewLockMap returns a new lock map
func newLockMap() *lockMap {
return &lockMap{
mutexMap: make(map[string]*sync.Mutex),
}
}
// LockEntry acquires a lock associated with the specific entry
func (lm *lockMap) LockEntry(entry string) {
lm.Lock()
// check if entry does not exists, then add entry
if _, exists := lm.mutexMap[entry]; !exists {
lm.addEntry(entry)
}
lm.Unlock()
lm.lockEntry(entry)
}
// UnlockEntry release the lock associated with the specific entry
func (lm *lockMap) UnlockEntry(entry string) {
lm.Lock()
defer lm.Unlock()
if _, exists := lm.mutexMap[entry]; !exists {
return
}
lm.unlockEntry(entry)
}
func (lm *lockMap) addEntry(entry string) {
lm.mutexMap[entry] = &sync.Mutex{}
}
func (lm *lockMap) lockEntry(entry string) {
lm.mutexMap[entry].Lock()
}
func (lm *lockMap) unlockEntry(entry string) {
lm.mutexMap[entry].Unlock()
}

View File

@ -0,0 +1,85 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"testing"
"time"
)
func TestSimpleLockEntry(t *testing.T) {
testLockMap := newLockMap()
callbackChan1 := make(chan interface{})
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
ensureCallbackHappens(t, callbackChan1)
}
func TestSimpleLockUnlockEntry(t *testing.T) {
testLockMap := newLockMap()
callbackChan1 := make(chan interface{})
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
ensureCallbackHappens(t, callbackChan1)
testLockMap.UnlockEntry("entry1")
}
func TestConcurrentLockEntry(t *testing.T) {
testLockMap := newLockMap()
callbackChan1 := make(chan interface{})
callbackChan2 := make(chan interface{})
go testLockMap.lockAndCallback(t, "entry1", callbackChan1)
ensureCallbackHappens(t, callbackChan1)
go testLockMap.lockAndCallback(t, "entry1", callbackChan2)
ensureNoCallback(t, callbackChan2)
testLockMap.UnlockEntry("entry1")
ensureCallbackHappens(t, callbackChan2)
testLockMap.UnlockEntry("entry1")
}
func (lm *lockMap) lockAndCallback(t *testing.T, entry string, callbackChan chan<- interface{}) {
lm.LockEntry(entry)
callbackChan <- true
}
var callbackTimeout = 2 * time.Second
func ensureCallbackHappens(t *testing.T, callbackChan <-chan interface{}) bool {
select {
case <-callbackChan:
return true
case <-time.After(callbackTimeout):
t.Fatalf("timed out waiting for callback")
return false
}
}
func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool {
select {
case <-callbackChan:
t.Fatalf("unexpected callback")
return false
case <-time.After(callbackTimeout):
return true
}
}