diff --git a/pkg/cloudprovider/providers/azure/BUILD b/pkg/cloudprovider/providers/azure/BUILD index 322024f2c61..83cbf462c58 100644 --- a/pkg/cloudprovider/providers/azure/BUILD +++ b/pkg/cloudprovider/providers/azure/BUILD @@ -23,6 +23,7 @@ go_library( "azure_storage.go", "azure_storageaccount.go", "azure_util.go", + "azure_util_cache.go", "azure_util_vmss.go", "azure_vmsets.go", "azure_wrap.go", @@ -53,6 +54,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) @@ -62,6 +64,7 @@ go_test( srcs = [ "azure_loadbalancer_test.go", "azure_test.go", + "azure_util_cache_test.go", "azure_util_test.go", ], importpath = "k8s.io/kubernetes/pkg/cloudprovider/providers/azure", diff --git a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go index 881a7dbb2c4..fdb78e2af7b 100644 --- a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go +++ b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go @@ -134,6 +134,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri } } else { glog.V(4).Infof("azureDisk - azure attach succeeded") + // Invalidate the cache right after updating + vmCache.Delete(vmName) } return err } @@ -192,6 +194,8 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t glog.Errorf("azureDisk - azure disk detach failed, err: %v", err) } else { glog.V(4).Infof("azureDisk - azure disk detach succeeded") + // Invalidate the cache right after updating + vmCache.Delete(vmName) } return err } diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index 6181550571b..77996a0e5b5 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -435,11 +435,15 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error // GetZoneByNodeName gets zone from instance view. func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView) + vm, exists, err := as.getVirtualMachine(types.NodeName(name)) if err != nil { return cloudprovider.Zone{}, err } + if !exists { + return cloudprovider.Zone{}, cloudprovider.InstanceNotFound + } + failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain)) zone := cloudprovider.Zone{ FailureDomain: failureDomain, diff --git a/pkg/cloudprovider/providers/azure/azure_util_cache.go b/pkg/cloudprovider/providers/azure/azure_util_cache.go new file mode 100644 index 00000000000..8e416601ad8 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_util_cache.go @@ -0,0 +1,81 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "sync" + "time" + + "k8s.io/client-go/tools/cache" +) + +type timedcacheEntry struct { + key string + data interface{} +} + +type timedcache struct { + store cache.Store + lock sync.Mutex +} + +// ttl time.Duration +func newTimedcache(ttl time.Duration) timedcache { + return timedcache{ + store: cache.NewTTLStore(cacheKeyFunc, ttl), + } +} + +func cacheKeyFunc(obj interface{}) (string, error) { + return obj.(*timedcacheEntry).key, nil +} + +func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (interface{}, error) { + entry, exists, err := t.store.GetByKey(key) + if err != nil { + return nil, err + } + if exists { + return (entry.(*timedcacheEntry)).data, nil + } + + t.lock.Lock() + defer t.lock.Unlock() + entry, exists, err = t.store.GetByKey(key) + if err != nil { + return nil, err + } + if exists { + return (entry.(*timedcacheEntry)).data, nil + } + + if createFunc == nil { + return nil, nil + } + created := createFunc() + t.store.Add(&timedcacheEntry{ + key: key, + data: created, + }) + return created, nil +} + +func (t *timedcache) Delete(key string) { + _ = t.store.Delete(&timedcacheEntry{ + key: key, + }) +} diff --git a/pkg/cloudprovider/providers/azure/azure_util_cache_test.go b/pkg/cloudprovider/providers/azure/azure_util_cache_test.go new file mode 100644 index 00000000000..0ac26d2e98a --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_util_cache_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestCacheReturnsSameObject(t *testing.T) { + type cacheTestingStruct struct{} + c := newTimedcache(1 * time.Minute) + o1 := cacheTestingStruct{} + get1, _ := c.GetOrCreate("b1", func() interface{} { + return o1 + }) + o2 := cacheTestingStruct{} + get2, _ := c.GetOrCreate("b1", func() interface{} { + return o2 + }) + if get1 != get2 { + t.Error("Get not equal") + } +} + +func TestCacheCallsCreateFuncOnce(t *testing.T) { + var callsCount uint32 + f1 := func() interface{} { + atomic.AddUint32(&callsCount, 1) + return 1 + } + c := newTimedcache(500 * time.Millisecond) + for index := 0; index < 20; index++ { + _, _ = c.GetOrCreate("b1", f1) + } + + if callsCount != 1 { + t.Error("Count not match") + } + time.Sleep(500 * time.Millisecond) + c.GetOrCreate("b1", f1) + if callsCount != 2 { + t.Error("Count not match") + } +} + +func TestCacheExpires(t *testing.T) { + f1 := func() interface{} { + return 1 + } + c := newTimedcache(500 * time.Millisecond) + get1, _ := c.GetOrCreate("b1", f1) + if get1 != 1 { + t.Error("Value not equal") + } + time.Sleep(500 * time.Millisecond) + get1, _ = c.GetOrCreate("b1", nil) + if get1 != nil { + t.Error("value not expired") + } +} + +func TestCacheDelete(t *testing.T) { + f1 := func() interface{} { + return 1 + } + c := newTimedcache(500 * time.Millisecond) + get1, _ := c.GetOrCreate("b1", f1) + if get1 != 1 { + t.Error("Value not equal") + } + get1, _ = c.GetOrCreate("b1", nil) + if get1 != 1 { + t.Error("Value not equal") + } + c.Delete("b1") + get1, _ = c.GetOrCreate("b1", nil) + if get1 != nil { + t.Error("value not deleted") + } +} diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index f1aa0def597..7c28d57e8f4 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -18,6 +18,8 @@ package azure import ( "net/http" + "sync" + "time" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" @@ -54,25 +56,65 @@ func ignoreStatusNotFoundFromError(err error) error { return err } +// cache used by getVirtualMachine +// 15s for expiration duration +var vmCache = newTimedcache(15 * time.Second) + +type vmRequest struct { + lock *sync.Mutex + vm *compute.VirtualMachine +} + +/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache +/// The service side has throttling control that delays responses if there're multiple requests onto certain vm +/// resource request in short period. func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, exists bool, err error) { var realErr error vmName := string(nodeName) - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName) - vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, "") - glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName) - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return vm, false, realErr + cachedRequest, err := vmCache.GetOrCreate(vmName, func() interface{} { + return &vmRequest{ + lock: &sync.Mutex{}, + vm: nil, + } + }) + if err != nil { + return compute.VirtualMachine{}, false, err + } + request := cachedRequest.(*vmRequest) + + if request.vm == nil { + request.lock.Lock() + defer request.lock.Unlock() + if request.vm == nil { + // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView + // request. If we first send an InstanceView request and then a non InstanceView request, the second + // request will still hit throttling. This is what happens now for cloud controller manager: In this + // case we do get instance view every time to fulfill the azure_zones requirement without hitting + // throttling. + // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed + az.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachinesClient.Get(%s): start", vmName) + vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, compute.InstanceView) + glog.V(10).Infof("VirtualMachinesClient.Get(%s): end", vmName) + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return vm, false, realErr + } + + if !exists { + return vm, false, nil + } + + request.vm = &vm + } + return vm, exists, err } - if !exists { - return vm, false, nil - } - - return vm, exists, err + glog.V(6).Infof("getVirtualMachine hits cache for(%s)", vmName) + return *request.vm, true, nil } func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) {