diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD index d410cf087c2..409ad725022 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/BUILD @@ -29,6 +29,7 @@ go_library( "azure_standard.go", "azure_storage.go", "azure_storageaccount.go", + "azure_utils.go", "azure_vmsets.go", "azure_vmss.go", "azure_vmss_cache.go", @@ -76,7 +77,6 @@ go_library( "//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library", "//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library", "//vendor/k8s.io/klog:go_default_library", - "//vendor/k8s.io/utils/keymutex:go_default_library", "//vendor/k8s.io/utils/net:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], @@ -98,6 +98,7 @@ go_test( "azure_storage_test.go", "azure_storageaccount_test.go", "azure_test.go", + "azure_utils_test.go", "azure_vmss_cache_test.go", "azure_vmss_test.go", "azure_wrap_test.go", diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go index 5dd572fcf0b..63be11c2199 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go @@ -614,6 +614,7 @@ func initDiskControllers(az *Cloud) error { resourceGroup: az.ResourceGroup, subscriptionID: az.SubscriptionID, cloud: az, + vmLockMap: newLockMap(), } az.BlobDiskController = &BlobDiskController{common: common} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go index f0401d42270..d8fc3b6798f 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go @@ -33,7 +33,6 @@ import ( cloudprovider "k8s.io/cloud-provider" volerr "k8s.io/cloud-provider/volume/errors" "k8s.io/klog" - "k8s.io/utils/keymutex" ) const ( @@ -58,9 +57,6 @@ var defaultBackOff = kwait.Backoff{ Jitter: 0.0, } -// acquire lock to attach/detach disk in one node -var diskOpMutex = keymutex.NewHashed(0) - type controllerCommon struct { subscriptionID string location string @@ -68,7 +64,9 @@ type controllerCommon struct { resourceGroup string // store disk URI when disk is in attaching or detaching process diskAttachDetachMap sync.Map - cloud *Cloud + // vm disk map used to lock per vm update calls + vmLockMap *lockMap + cloud *Cloud } // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. @@ -144,8 +142,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) } - diskOpMutex.LockKey(instanceid) - defer diskOpMutex.UnlockKey(instanceid) + c.vmLockMap.LockEntry(strings.ToLower(string(nodeName))) + defer c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName))) lun, err := c.GetNextDiskLun(nodeName) if err != nil { @@ -161,7 +159,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri // DetachDisk detaches a disk from host. The vhd can be identified by diskName or diskURI. func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { - instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) + _, err := c.cloud.InstanceID(context.TODO(), nodeName) if err != nil { if err == cloudprovider.InstanceNotFound { // if host doesn't exist, no need to detach @@ -181,20 +179,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N klog.V(2).Infof("detach %v from node %q", diskURI, nodeName) // make the lock here as small as possible - diskOpMutex.LockKey(instanceid) + c.vmLockMap.LockEntry(strings.ToLower(string(nodeName))) c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching") resp, err := vmset.DetachDisk(diskName, diskURI, nodeName) c.diskAttachDetachMap.Delete(strings.ToLower(diskURI)) - diskOpMutex.UnlockKey(instanceid) + c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName))) if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err) retryErr := kwait.ExponentialBackoff(c.cloud.RequestBackoff(), func() (bool, error) { - diskOpMutex.LockKey(instanceid) + c.vmLockMap.LockEntry(strings.ToLower(string(nodeName))) c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching") resp, err := vmset.DetachDisk(diskName, diskURI, nodeName) c.diskAttachDetachMap.Delete(strings.ToLower(diskURI)) - diskOpMutex.UnlockKey(instanceid) + c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName))) return c.cloud.processHTTPRetryResponse(nil, "", resp, err) }) if retryErr != nil { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common_test.go index 905b95753c5..f54e470fac5 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common_test.go @@ -68,6 +68,7 @@ func TestCommonAttachDisk(t *testing.T) { resourceGroup: testCloud.ResourceGroup, subscriptionID: testCloud.SubscriptionID, cloud: testCloud, + vmLockMap: newLockMap(), } diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", testCloud.SubscriptionID, testCloud.ResourceGroup) @@ -116,6 +117,7 @@ func TestCommonDetachDisk(t *testing.T) { resourceGroup: testCloud.ResourceGroup, subscriptionID: testCloud.SubscriptionID, cloud: testCloud, + vmLockMap: newLockMap(), } diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name", testCloud.SubscriptionID, testCloud.ResourceGroup) @@ -156,6 +158,7 @@ func TestGetDiskLun(t *testing.T) { resourceGroup: testCloud.ResourceGroup, subscriptionID: testCloud.SubscriptionID, cloud: testCloud, + vmLockMap: newLockMap(), } setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false) @@ -194,6 +197,7 @@ func TestGetNextDiskLun(t *testing.T) { resourceGroup: testCloud.ResourceGroup, subscriptionID: testCloud.SubscriptionID, cloud: testCloud, + vmLockMap: newLockMap(), } setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, test.isDataDisksFull) @@ -235,6 +239,7 @@ func TestDisksAreAttached(t *testing.T) { resourceGroup: testCloud.ResourceGroup, subscriptionID: testCloud.SubscriptionID, cloud: testCloud, + vmLockMap: newLockMap(), } setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go new file mode 100644 index 00000000000..d4b5aba8015 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils.go @@ -0,0 +1,71 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "sync" +) + +// lockMap used to lock on entries +type lockMap struct { + sync.Mutex + mutexMap map[string]*sync.Mutex +} + +// NewLockMap returns a new lock map +func newLockMap() *lockMap { + return &lockMap{ + mutexMap: make(map[string]*sync.Mutex), + } +} + +// LockEntry acquires a lock associated with the specific entry +func (lm *lockMap) LockEntry(entry string) { + lm.Lock() + // check if entry does not exists, then add entry + if _, exists := lm.mutexMap[entry]; !exists { + lm.addEntry(entry) + } + + lm.Unlock() + lm.lockEntry(entry) +} + +// UnlockEntry release the lock associated with the specific entry +func (lm *lockMap) UnlockEntry(entry string) { + lm.Lock() + defer lm.Unlock() + + if _, exists := lm.mutexMap[entry]; !exists { + return + } + lm.unlockEntry(entry) +} + +func (lm *lockMap) addEntry(entry string) { + lm.mutexMap[entry] = &sync.Mutex{} +} + +func (lm *lockMap) lockEntry(entry string) { + lm.mutexMap[entry].Lock() +} + +func (lm *lockMap) unlockEntry(entry string) { + lm.mutexMap[entry].Unlock() +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go new file mode 100644 index 00000000000..cb527f37dd2 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_utils_test.go @@ -0,0 +1,85 @@ +// +build !providerless + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "testing" + "time" +) + +func TestSimpleLockEntry(t *testing.T) { + testLockMap := newLockMap() + + callbackChan1 := make(chan interface{}) + go testLockMap.lockAndCallback(t, "entry1", callbackChan1) + ensureCallbackHappens(t, callbackChan1) +} + +func TestSimpleLockUnlockEntry(t *testing.T) { + testLockMap := newLockMap() + + callbackChan1 := make(chan interface{}) + go testLockMap.lockAndCallback(t, "entry1", callbackChan1) + ensureCallbackHappens(t, callbackChan1) + testLockMap.UnlockEntry("entry1") +} + +func TestConcurrentLockEntry(t *testing.T) { + testLockMap := newLockMap() + + callbackChan1 := make(chan interface{}) + callbackChan2 := make(chan interface{}) + + go testLockMap.lockAndCallback(t, "entry1", callbackChan1) + ensureCallbackHappens(t, callbackChan1) + + go testLockMap.lockAndCallback(t, "entry1", callbackChan2) + ensureNoCallback(t, callbackChan2) + + testLockMap.UnlockEntry("entry1") + ensureCallbackHappens(t, callbackChan2) + testLockMap.UnlockEntry("entry1") +} + +func (lm *lockMap) lockAndCallback(t *testing.T, entry string, callbackChan chan<- interface{}) { + lm.LockEntry(entry) + callbackChan <- true +} + +var callbackTimeout = 2 * time.Second + +func ensureCallbackHappens(t *testing.T, callbackChan <-chan interface{}) bool { + select { + case <-callbackChan: + return true + case <-time.After(callbackTimeout): + t.Fatalf("timed out waiting for callback") + return false + } +} + +func ensureNoCallback(t *testing.T, callbackChan <-chan interface{}) bool { + select { + case <-callbackChan: + t.Fatalf("unexpected callback") + return false + case <-time.After(callbackTimeout): + return true + } +}