From 259dbf8da7ed43bd276bac6cf8bd2bf9f7744253 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 17:05:32 +0800 Subject: [PATCH 1/7] Make azure cache general for all objects --- .../providers/azure/azure_cache.go | 107 +++++++++++++----- 1 file changed, 80 insertions(+), 27 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure_cache.go b/pkg/cloudprovider/providers/azure/azure_cache.go index 8e416601ad8..0c4ee04a752 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache.go +++ b/pkg/cloudprovider/providers/azure/azure_cache.go @@ -17,40 +17,62 @@ limitations under the License. package azure import ( + "fmt" "sync" "time" "k8s.io/client-go/tools/cache" ) -type timedcacheEntry struct { +// getFunc defines a getter function for timedCache. +type getFunc func(key string) (interface{}, error) + +// cacheEntry is the internal structure stores inside TTLStore. +type cacheEntry struct { key string data interface{} + + // The lock to ensure not updating same entry simultaneously. + lock sync.Mutex } -type timedcache struct { - store cache.Store - lock sync.Mutex -} - -// ttl time.Duration -func newTimedcache(ttl time.Duration) timedcache { - return timedcache{ - store: cache.NewTTLStore(cacheKeyFunc, ttl), - } -} - +// cacheKeyFunc defines the key function required in TTLStore. func cacheKeyFunc(obj interface{}) (string, error) { - return obj.(*timedcacheEntry).key, nil + if entry, ok := obj.(*cacheEntry); ok { + return entry.key, nil + } + + return "", fmt.Errorf("obj %q is not an object of cacheEntry", obj) } -func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (interface{}, error) { +// timedCache is a cache with TTL. +type timedCache struct { + store cache.Store + lock sync.Mutex + getter getFunc +} + +// newTimedcache creates a new timedCache. +func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { + if getter == nil { + return nil, fmt.Errorf("getter is not provided") + } + + return &timedCache{ + getter: getter, + store: cache.NewTTLStore(cacheKeyFunc, ttl), + }, nil +} + +// getInternal returns cacheEntry by key. If the key is not cached yet, +// it returns a cacheEntry with nil data. +func (t *timedCache) getInternal(key string) (*cacheEntry, error) { entry, exists, err := t.store.GetByKey(key) if err != nil { return nil, err } if exists { - return (entry.(*timedcacheEntry)).data, nil + return entry.(*cacheEntry), nil } t.lock.Lock() @@ -60,22 +82,53 @@ func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (int return nil, err } if exists { - return (entry.(*timedcacheEntry)).data, nil + return entry.(*cacheEntry), nil } - if createFunc == nil { - return nil, nil - } - created := createFunc() - t.store.Add(&timedcacheEntry{ + // Still not found, add new entry with nil data. + // Note the data will be filled later by getter. + newEntry := &cacheEntry{ key: key, - data: created, - }) - return created, nil + data: nil, + } + t.store.Add(newEntry) + return newEntry, nil } -func (t *timedcache) Delete(key string) { - _ = t.store.Delete(&timedcacheEntry{ +// Get returns the requested item by key. +func (t *timedCache) Get(key string) (interface{}, error) { + entry, err := t.getInternal(key) + if err != nil { + return nil, err + } + + // Data is still not cached yet, cache it by getter. + if entry.data == nil { + entry.lock.Lock() + defer entry.lock.Unlock() + + data, err := t.getter(key) + if err != nil { + return nil, err + } + + entry.data = data + } + + return entry.data, nil +} + +// Update sets an item in the cache to its updated state. +func (t *timedCache) Update(key string, data interface{}) error { + return t.store.Update(&cacheEntry{ + key: key, + data: data, + }) +} + +// Delete removes an item from the cache. +func (t *timedCache) Delete(key string) error { + return t.store.Delete(&cacheEntry{ key: key, }) } From 035c8da63d61fad499373c0dd451d47498e9badd Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 17:05:51 +0800 Subject: [PATCH 2/7] New unit tests for timedCache --- .../providers/azure/azure_cache_test.go | 161 +++++++++++------- 1 file changed, 102 insertions(+), 59 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure_cache_test.go b/pkg/cloudprovider/providers/azure/azure_cache_test.go index 0ac26d2e98a..1608f7d711f 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache_test.go +++ b/pkg/cloudprovider/providers/azure/azure_cache_test.go @@ -17,80 +17,123 @@ limitations under the License. package azure import ( - "sync/atomic" + "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) -func TestCacheReturnsSameObject(t *testing.T) { - type cacheTestingStruct struct{} - c := newTimedcache(1 * time.Minute) - o1 := cacheTestingStruct{} - get1, _ := c.GetOrCreate("b1", func() interface{} { - return o1 - }) - o2 := cacheTestingStruct{} - get2, _ := c.GetOrCreate("b1", func() interface{} { - return o2 - }) - if get1 != get2 { - t.Error("Get not equal") - } +var ( + fakeCacheTTL = 2 * time.Second +) + +type fakeDataObj struct{} + +type fakeDataSource struct { + data map[string]*fakeDataObj + lock sync.Mutex } -func TestCacheCallsCreateFuncOnce(t *testing.T) { - var callsCount uint32 - f1 := func() interface{} { - atomic.AddUint32(&callsCount, 1) - return 1 - } - c := newTimedcache(500 * time.Millisecond) - for index := 0; index < 20; index++ { - _, _ = c.GetOrCreate("b1", f1) +func (fake *fakeDataSource) get(key string) (interface{}, error) { + fake.lock.Lock() + defer fake.lock.Unlock() + + if v, ok := fake.data[key]; ok { + return v, nil } - if callsCount != 1 { - t.Error("Count not match") - } - time.Sleep(500 * time.Millisecond) - c.GetOrCreate("b1", f1) - if callsCount != 2 { - t.Error("Count not match") - } + return nil, nil } -func TestCacheExpires(t *testing.T) { - f1 := func() interface{} { - return 1 +func (fake *fakeDataSource) set(data map[string]*fakeDataObj) { + fake.lock.Lock() + defer fake.lock.Unlock() + + fake.data = data +} + +func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) { + dataSource := &fakeDataSource{ + data: make(map[string]*fakeDataObj), } - c := newTimedcache(500 * time.Millisecond) - get1, _ := c.GetOrCreate("b1", f1) - if get1 != 1 { - t.Error("Value not equal") + getter := dataSource.get + cache, err := newTimedcache(fakeCacheTTL, getter) + assert.NoError(t, err) + return dataSource, cache +} + +func TestCacheGet(t *testing.T) { + val := &fakeDataObj{} + cases := []struct { + name string + data map[string]*fakeDataObj + key string + expected interface{} + }{ + { + name: "cache should return nil for empty data source", + key: "key1", + expected: nil, + }, + { + name: "cache should return nil for non exist key", + data: map[string]*fakeDataObj{"key2": val}, + key: "key1", + expected: nil, + }, + { + name: "cache should return data for existing key", + data: map[string]*fakeDataObj{"key1": val}, + key: "key1", + expected: val, + }, } - time.Sleep(500 * time.Millisecond) - get1, _ = c.GetOrCreate("b1", nil) - if get1 != nil { - t.Error("value not expired") + + for _, c := range cases { + dataSource, cache := newFakeCache(t) + dataSource.set(c.data) + val, err := cache.Get(c.key) + assert.NoError(t, err, c.name) + assert.Equal(t, c.expected, val, c.name) } } func TestCacheDelete(t *testing.T) { - f1 := func() interface{} { - return 1 - } - c := newTimedcache(500 * time.Millisecond) - get1, _ := c.GetOrCreate("b1", f1) - if get1 != 1 { - t.Error("Value not equal") - } - get1, _ = c.GetOrCreate("b1", nil) - if get1 != 1 { - t.Error("Value not equal") - } - c.Delete("b1") - get1, _ = c.GetOrCreate("b1", nil) - if get1 != nil { - t.Error("value not deleted") + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, val, v, "cache should get correct data") + + dataSource.set(nil) + cache.Delete(key) + v, err = cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, nil, v, "cache should get nil after data is removed") +} + +func TestCacheExpired(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, val, v, "cache should get correct data") + + time.Sleep(fakeCacheTTL) + v, err = cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, val, v, "cache should get correct data even after expired") } From 2badf1ff55a9b046ee49bdd94ca10a07adf79c54 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 17:06:16 +0800 Subject: [PATCH 3/7] Add cache for virtual machines --- pkg/cloudprovider/providers/azure/azure.go | 8 ++ .../providers/azure/azure_controllerCommon.go | 4 +- .../providers/azure/azure_test.go | 1 + .../providers/azure/azure_wrap.go | 76 +++++++------------ 4 files changed, 40 insertions(+), 49 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 241920e5f04..d42c8d4669e 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -135,6 +135,8 @@ type Cloud struct { VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient + vmCache *timedCache + *BlobDiskController *ManagedDiskController *controllerCommon @@ -244,6 +246,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.vmSet = newAvailabilitySet(&az) } + vmCache, err := az.newVMCache() + if err != nil { + return nil, err + } + az.vmCache = vmCache + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go index 463cd04f4a8..e7d70687411 100644 --- a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go +++ b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go @@ -124,7 +124,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri } else { glog.V(4).Info("azureDisk - azure attach succeeded") // Invalidate the cache right after updating - vmCache.Delete(vmName) + c.cloud.vmCache.Delete(vmName) } return err } @@ -183,7 +183,7 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t } else { glog.V(4).Info("azureDisk - azure disk detach succeeded") // Invalidate the cache right after updating - vmCache.Delete(vmName) + c.cloud.vmCache.Delete(vmName) } return err } diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 21fcf0bbe94..03953383438 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -878,6 +878,7 @@ func getTestCloud() (az *Cloud) { az.VirtualMachineScaleSetVMsClient = newFakeVirtualMachineScaleSetVMsClient() az.VirtualMachinesClient = newFakeAzureVirtualMachinesClient() az.vmSet = newAvailabilitySet(az) + az.vmCache, _ = az.newVMCache() return az } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index e37ac8ca950..70dbd41740d 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -18,18 +18,20 @@ package azure import ( "net/http" - "sync" "time" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" "github.com/Azure/go-autorest/autorest" - "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider" ) +var ( + vmCacheTTL = time.Minute +) + // checkExistsFromError inspects an error and returns a true if err is nil, // false if error is an autorest.Error with StatusCode=404 and will return the // error back if error is another status code or another type of error. @@ -60,59 +62,21 @@ func ignoreStatusNotFoundFromError(err error) error { return err } -// cache used by getVirtualMachine -// 15s for expiration duration -var vmCache = newTimedcache(15 * time.Second) - -type vmRequest struct { - lock *sync.Mutex - vm *compute.VirtualMachine -} - /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// resource request in short period. func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, err error) { vmName := string(nodeName) - - cachedRequest, err := vmCache.GetOrCreate(vmName, func() interface{} { - return &vmRequest{ - lock: &sync.Mutex{}, - vm: nil, - } - }) + cachedVM, err := az.vmCache.Get(vmName) if err != nil { - return compute.VirtualMachine{}, err - } - request := cachedRequest.(*vmRequest) - - if request.vm == nil { - request.lock.Lock() - defer request.lock.Unlock() - if request.vm == nil { - // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView - // request. If we first send an InstanceView request and then a non InstanceView request, the second - // request will still hit throttling. This is what happens now for cloud controller manager: In this - // case we do get instance view every time to fulfill the azure_zones requirement without hitting - // throttling. - // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed - vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, compute.InstanceView) - exists, realErr := checkResourceExistsFromError(err) - if realErr != nil { - return vm, realErr - } - - if !exists { - return vm, cloudprovider.InstanceNotFound - } - - request.vm = &vm - } - return *request.vm, nil + return vm, err } - glog.V(6).Infof("getVirtualMachine hits cache for(%s)", vmName) - return *request.vm, nil + if cachedVM == nil { + return vm, cloudprovider.InstanceNotFound + } + + return *(cachedVM.(*compute.VirtualMachine)), nil } func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { @@ -189,3 +153,21 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi return lb, exists, err } + +func (az *Cloud) newVMCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, key, compute.InstanceView) + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &vm, nil + } + + return newTimedcache(vmCacheTTL, getter) +} From d22b6d9ebe46698af5ceb15fdb7d04f9485ed8ae Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 17:19:34 +0800 Subject: [PATCH 4/7] Add cache for load balancer --- pkg/cloudprovider/providers/azure/azure.go | 7 ++++ .../providers/azure/azure_backoff.go | 14 ++++++-- .../providers/azure/azure_test.go | 1 + .../providers/azure/azure_wrap.go | 32 ++++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index d42c8d4669e..6b40abdfeec 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -136,6 +136,7 @@ type Cloud struct { VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient vmCache *timedCache + lbCache *timedCache *BlobDiskController *ManagedDiskController @@ -252,6 +253,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { } az.vmCache = vmCache + lbCache, err := az.newLBCache() + if err != nil { + return nil, err + } + az.lbCache = lbCache + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index 5d9a2af2ebb..ddc2f69ff52 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -142,7 +142,12 @@ func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error { resp := <-respChan err := <-errChan glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name) - return processRetryResponse(resp.Response, err) + done, err := processRetryResponse(resp.Response, err) + if done && err == nil { + // Invalidate the cache right after updating + az.lbCache.Delete(*lb.Name) + } + return done, err }) } @@ -283,7 +288,12 @@ func (az *Cloud) DeleteLBWithRetry(lbName string) error { respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) resp := <-respChan err := <-errChan - return processRetryResponse(resp, err) + done, err := processRetryResponse(resp, err) + if done && err == nil { + // Invalidate the cache right after deleting + az.lbCache.Delete(lbName) + } + return done, err }) } diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 03953383438..8e9c1d8bbf6 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -879,6 +879,7 @@ func getTestCloud() (az *Cloud) { az.VirtualMachinesClient = newFakeAzureVirtualMachinesClient() az.vmSet = newAvailabilitySet(az) az.vmCache, _ = az.newVMCache() + az.lbCache, _ = az.newLBCache() return az } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index 70dbd41740d..c75bd62a70e 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -30,6 +30,7 @@ import ( var ( vmCacheTTL = time.Minute + lbCacheTTL = 2 * time.Minute ) // checkExistsFromError inspects an error and returns a true if err is nil, @@ -139,19 +140,16 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet } func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { - var realErr error - - lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "") - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return lb, false, realErr + cachedLB, err := az.lbCache.Get(name) + if err != nil { + return lb, false, err } - if !exists { + if cachedLB == nil { return lb, false, nil } - return lb, exists, err + return *(cachedLB.(*network.LoadBalancer)), true, nil } func (az *Cloud) newVMCache() (*timedCache, error) { @@ -171,3 +169,21 @@ func (az *Cloud) newVMCache() (*timedCache, error) { return newTimedcache(vmCacheTTL, getter) } + +func (az *Cloud) newLBCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + lb, err := az.LoadBalancerClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &lb, nil + } + + return newTimedcache(lbCacheTTL, getter) +} From 21c8a63689f6e0cf2d31aaf09c0452f3ea1de625 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 21:57:11 +0800 Subject: [PATCH 5/7] Add cache for network security groups --- pkg/cloudprovider/providers/azure/azure.go | 11 +++++- .../providers/azure/azure_backoff.go | 7 +++- .../providers/azure/azure_loadbalancer.go | 2 +- .../providers/azure/azure_test.go | 1 + .../providers/azure/azure_wrap.go | 37 ++++++++++++++++++- 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 6b40abdfeec..1fd3eed2fc0 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -135,8 +135,9 @@ type Cloud struct { VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient - vmCache *timedCache - lbCache *timedCache + vmCache *timedCache + lbCache *timedCache + nsgCache *timedCache *BlobDiskController *ManagedDiskController @@ -259,6 +260,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { } az.lbCache = lbCache + nsgCache, err := az.newNSGCache() + if err != nil { + return nil, err + } + az.nsgCache = nsgCache + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index ddc2f69ff52..64c212d40a9 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -131,7 +131,12 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error { resp := <-respChan err := <-errChan glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) - return processRetryResponse(resp.Response, err) + done, err := processRetryResponse(resp.Response, err) + if done && err == nil { + // Invalidate the cache right after updating + az.lbCache.Delete(*sg.Name) + } + return done, err }) } diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 8b4387cf63a..65b6f9c6287 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -819,7 +819,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, ports = []v1.ServicePort{} } - sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") + sg, err := az.getSecurityGroup() if err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 8e9c1d8bbf6..8226815efa2 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -880,6 +880,7 @@ func getTestCloud() (az *Cloud) { az.vmSet = newAvailabilitySet(az) az.vmCache, _ = az.newVMCache() az.lbCache, _ = az.newLBCache() + az.nsgCache, _ = az.newNSGCache() return az } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index c75bd62a70e..792946cc907 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -17,6 +17,7 @@ limitations under the License. package azure import ( + "fmt" "net/http" "time" @@ -29,8 +30,9 @@ import ( ) var ( - vmCacheTTL = time.Minute - lbCacheTTL = 2 * time.Minute + vmCacheTTL = time.Minute + lbCacheTTL = 2 * time.Minute + nsgCacheTTL = 2 * time.Minute ) // checkExistsFromError inspects an error and returns a true if err is nil, @@ -152,6 +154,19 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi return *(cachedLB.(*network.LoadBalancer)), true, nil } +func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { + securityGroup, err := az.nsgCache.Get(az.SecurityGroupName) + if err != nil { + return nsg, err + } + + if securityGroup == nil { + return nsg, fmt.Errorf("nsg %q not found", az.SecurityGroupName) + } + + return *(securityGroup.(*network.SecurityGroup)), nil +} + func (az *Cloud) newVMCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, key, compute.InstanceView) @@ -187,3 +202,21 @@ func (az *Cloud) newLBCache() (*timedCache, error) { return newTimedcache(lbCacheTTL, getter) } + +func (az *Cloud) newNSGCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + nsg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &nsg, nil + } + + return newTimedcache(nsgCacheTTL, getter) +} From daec2bd7455ddd427616e4d7e4a2459a67f0f3a3 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 7 Feb 2018 22:26:03 +0800 Subject: [PATCH 6/7] Add cache for route tables --- pkg/cloudprovider/providers/azure/azure.go | 7 ++++ .../providers/azure/azure_routes.go | 7 ++-- .../providers/azure/azure_routes_test.go | 1 + .../providers/azure/azure_test.go | 1 + .../providers/azure/azure_wrap.go | 32 ++++++++++++++----- 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 1fd3eed2fc0..d26ea04080b 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -138,6 +138,7 @@ type Cloud struct { vmCache *timedCache lbCache *timedCache nsgCache *timedCache + rtCache *timedCache *BlobDiskController *ManagedDiskController @@ -266,6 +267,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { } az.nsgCache = nsgCache + rtCache, err := az.newRouteTableCache() + if err != nil { + return nil, err + } + az.rtCache = rtCache + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index e08ca912581..c78de23d730 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -98,10 +98,9 @@ func (az *Cloud) createRouteTable() error { return err } - glog.V(10).Infof("RouteTablesClient.Get(%q): start", az.RouteTableName) - _, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") - glog.V(10).Infof("RouteTablesClient.Get(%q): end", az.RouteTableName) - return err + // Invalidate the cache right after updating + az.rtCache.Delete(az.RouteTableName) + return nil } // CreateRoute creates the described managed route diff --git a/pkg/cloudprovider/providers/azure/azure_routes_test.go b/pkg/cloudprovider/providers/azure/azure_routes_test.go index 02041f0fa22..31f31ee4eb4 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes_test.go +++ b/pkg/cloudprovider/providers/azure/azure_routes_test.go @@ -37,6 +37,7 @@ func TestCreateRoute(t *testing.T) { Location: "location", }, } + cloud.rtCache, _ = cloud.newRouteTableCache() expectedTable := network.RouteTable{ Name: &cloud.RouteTableName, Location: &cloud.Location, diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 8226815efa2..e638e4522d4 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -881,6 +881,7 @@ func getTestCloud() (az *Cloud) { az.vmCache, _ = az.newVMCache() az.lbCache, _ = az.newLBCache() az.nsgCache, _ = az.newNSGCache() + az.rtCache, _ = az.newRouteTableCache() return az } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index 792946cc907..d2bb36fe81a 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -33,6 +33,7 @@ var ( vmCacheTTL = time.Minute lbCacheTTL = 2 * time.Minute nsgCacheTTL = 2 * time.Minute + rtCacheTTL = 2 * time.Minute ) // checkExistsFromError inspects an error and returns a true if err is nil, @@ -83,19 +84,16 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM } func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { - var realErr error - - routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return routeTable, false, realErr + cachedRt, err := az.rtCache.Get(az.RouteTableName) + if err != nil { + return routeTable, false, err } - if !exists { + if cachedRt == nil { return routeTable, false, nil } - return routeTable, exists, err + return *(cachedRt.(*network.RouteTable)), true, nil } func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (pip network.PublicIPAddress, exists bool, err error) { @@ -220,3 +218,21 @@ func (az *Cloud) newNSGCache() (*timedCache, error) { return newTimedcache(nsgCacheTTL, getter) } + +func (az *Cloud) newRouteTableCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + rt, err := az.RouteTablesClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &rt, nil + } + + return newTimedcache(rtCacheTTL, getter) +} From 7634eacb4fbb70996bab322c91ba5edcf839dee5 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Fri, 9 Feb 2018 20:38:29 +0800 Subject: [PATCH 7/7] Add error handling and new tests --- pkg/cloudprovider/providers/azure/azure.go | 12 +++------ .../providers/azure/azure_backoff.go | 2 +- .../providers/azure/azure_cache.go | 26 ++++++------------- .../providers/azure/azure_cache_test.go | 25 ++++++++++++++++-- .../providers/azure/azure_wrap.go | 6 +++++ 5 files changed, 42 insertions(+), 29 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index d26ea04080b..1f6983d8594 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -249,29 +249,25 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.vmSet = newAvailabilitySet(&az) } - vmCache, err := az.newVMCache() + az.vmCache, err = az.newVMCache() if err != nil { return nil, err } - az.vmCache = vmCache - lbCache, err := az.newLBCache() + az.lbCache, err = az.newLBCache() if err != nil { return nil, err } - az.lbCache = lbCache - nsgCache, err := az.newNSGCache() + az.nsgCache, err = az.newNSGCache() if err != nil { return nil, err } - az.nsgCache = nsgCache - rtCache, err := az.newRouteTableCache() + az.rtCache, err = az.newRouteTableCache() if err != nil { return nil, err } - az.rtCache = rtCache if err := initDiskControllers(&az); err != nil { return nil, err diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index 64c212d40a9..39d2a291b5a 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -134,7 +134,7 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error { done, err := processRetryResponse(resp.Response, err) if done && err == nil { // Invalidate the cache right after updating - az.lbCache.Delete(*sg.Name) + az.nsgCache.Delete(*sg.Name) } return done, err }) diff --git a/pkg/cloudprovider/providers/azure/azure_cache.go b/pkg/cloudprovider/providers/azure/azure_cache.go index 0c4ee04a752..fd69eadbff2 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache.go +++ b/pkg/cloudprovider/providers/azure/azure_cache.go @@ -38,11 +38,7 @@ type cacheEntry struct { // cacheKeyFunc defines the key function required in TTLStore. func cacheKeyFunc(obj interface{}) (string, error) { - if entry, ok := obj.(*cacheEntry); ok { - return entry.key, nil - } - - return "", fmt.Errorf("obj %q is not an object of cacheEntry", obj) + return obj.(*cacheEntry).key, nil } // timedCache is a cache with TTL. @@ -107,25 +103,19 @@ func (t *timedCache) Get(key string) (interface{}, error) { entry.lock.Lock() defer entry.lock.Unlock() - data, err := t.getter(key) - if err != nil { - return nil, err - } + if entry.data == nil { + data, err := t.getter(key) + if err != nil { + return nil, err + } - entry.data = data + entry.data = data + } } return entry.data, nil } -// Update sets an item in the cache to its updated state. -func (t *timedCache) Update(key string, data interface{}) error { - return t.store.Update(&cacheEntry{ - key: key, - data: data, - }) -} - // Delete removes an item from the cache. func (t *timedCache) Delete(key string) error { return t.store.Delete(&cacheEntry{ diff --git a/pkg/cloudprovider/providers/azure/azure_cache_test.go b/pkg/cloudprovider/providers/azure/azure_cache_test.go index 1608f7d711f..574d2cb119b 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache_test.go +++ b/pkg/cloudprovider/providers/azure/azure_cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package azure import ( + "fmt" "sync" "testing" "time" @@ -31,14 +32,16 @@ var ( type fakeDataObj struct{} type fakeDataSource struct { - data map[string]*fakeDataObj - lock sync.Mutex + called int + data map[string]*fakeDataObj + lock sync.Mutex } func (fake *fakeDataSource) get(key string) (interface{}, error) { fake.lock.Lock() defer fake.lock.Unlock() + fake.called = fake.called + 1 if v, ok := fake.data[key]; ok { return v, nil } @@ -51,6 +54,7 @@ func (fake *fakeDataSource) set(data map[string]*fakeDataObj) { defer fake.lock.Unlock() fake.data = data + fake.called = 0 } func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) { @@ -99,6 +103,20 @@ func TestCacheGet(t *testing.T) { } } +func TestCacheGetError(t *testing.T) { + getError := fmt.Errorf("getError") + getter := func(key string) (interface{}, error) { + return nil, getError + } + cache, err := newTimedcache(fakeCacheTTL, getter) + assert.NoError(t, err) + + val, err := cache.Get("key") + assert.Error(t, err) + assert.Equal(t, getError, err) + assert.Nil(t, val) +} + func TestCacheDelete(t *testing.T) { key := "key1" val := &fakeDataObj{} @@ -116,6 +134,7 @@ func TestCacheDelete(t *testing.T) { cache.Delete(key) v, err = cache.Get(key) assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) assert.Equal(t, nil, v, "cache should get nil after data is removed") } @@ -130,10 +149,12 @@ func TestCacheExpired(t *testing.T) { v, err := cache.Get(key) assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) assert.Equal(t, val, v, "cache should get correct data") time.Sleep(fakeCacheTTL) v, err = cache.Get(key) assert.NoError(t, err) + assert.Equal(t, 2, dataSource.called) assert.Equal(t, val, v, "cache should get correct data even after expired") } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index d2bb36fe81a..4c295c1a7cb 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -167,6 +167,12 @@ func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { func (az *Cloud) newVMCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { + // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView + // request. If we first send an InstanceView request and then a non InstanceView request, the second + // request will still hit throttling. This is what happens now for cloud controller manager: In this + // case we do get instance view every time to fulfill the azure_zones requirement without hitting + // throttling. + // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, key, compute.InstanceView) exists, realErr := checkResourceExistsFromError(err) if realErr != nil {