diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 241920e5f04..1f6983d8594 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -135,6 +135,11 @@ type Cloud struct { VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient + vmCache *timedCache + lbCache *timedCache + nsgCache *timedCache + rtCache *timedCache + *BlobDiskController *ManagedDiskController *controllerCommon @@ -244,6 +249,26 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.vmSet = newAvailabilitySet(&az) } + az.vmCache, err = az.newVMCache() + if err != nil { + return nil, err + } + + az.lbCache, err = az.newLBCache() + if err != nil { + return nil, err + } + + az.nsgCache, err = az.newNSGCache() + if err != nil { + return nil, err + } + + az.rtCache, err = az.newRouteTableCache() + if err != nil { + return nil, err + } + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index 5d9a2af2ebb..39d2a291b5a 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -131,7 +131,12 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error { resp := <-respChan err := <-errChan glog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) - return processRetryResponse(resp.Response, err) + done, err := processRetryResponse(resp.Response, err) + if done && err == nil { + // Invalidate the cache right after updating + az.nsgCache.Delete(*sg.Name) + } + return done, err }) } @@ -142,7 +147,12 @@ func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error { resp := <-respChan err := <-errChan glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name) - return processRetryResponse(resp.Response, err) + done, err := processRetryResponse(resp.Response, err) + if done && err == nil { + // Invalidate the cache right after updating + az.lbCache.Delete(*lb.Name) + } + return done, err }) } @@ -283,7 +293,12 @@ func (az *Cloud) DeleteLBWithRetry(lbName string) error { respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) resp := <-respChan err := <-errChan - return processRetryResponse(resp, err) + done, err := processRetryResponse(resp, err) + if done && err == nil { + // Invalidate the cache right after deleting + az.lbCache.Delete(lbName) + } + return done, err }) } diff --git a/pkg/cloudprovider/providers/azure/azure_cache.go b/pkg/cloudprovider/providers/azure/azure_cache.go index 8e416601ad8..fd69eadbff2 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache.go +++ b/pkg/cloudprovider/providers/azure/azure_cache.go @@ -17,40 +17,58 @@ limitations under the License. package azure import ( + "fmt" "sync" "time" "k8s.io/client-go/tools/cache" ) -type timedcacheEntry struct { +// getFunc defines a getter function for timedCache. +type getFunc func(key string) (interface{}, error) + +// cacheEntry is the internal structure stores inside TTLStore. +type cacheEntry struct { key string data interface{} + + // The lock to ensure not updating same entry simultaneously. + lock sync.Mutex } -type timedcache struct { - store cache.Store - lock sync.Mutex -} - -// ttl time.Duration -func newTimedcache(ttl time.Duration) timedcache { - return timedcache{ - store: cache.NewTTLStore(cacheKeyFunc, ttl), - } -} - +// cacheKeyFunc defines the key function required in TTLStore. func cacheKeyFunc(obj interface{}) (string, error) { - return obj.(*timedcacheEntry).key, nil + return obj.(*cacheEntry).key, nil } -func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (interface{}, error) { +// timedCache is a cache with TTL. +type timedCache struct { + store cache.Store + lock sync.Mutex + getter getFunc +} + +// newTimedcache creates a new timedCache. +func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { + if getter == nil { + return nil, fmt.Errorf("getter is not provided") + } + + return &timedCache{ + getter: getter, + store: cache.NewTTLStore(cacheKeyFunc, ttl), + }, nil +} + +// getInternal returns cacheEntry by key. If the key is not cached yet, +// it returns a cacheEntry with nil data. +func (t *timedCache) getInternal(key string) (*cacheEntry, error) { entry, exists, err := t.store.GetByKey(key) if err != nil { return nil, err } if exists { - return (entry.(*timedcacheEntry)).data, nil + return entry.(*cacheEntry), nil } t.lock.Lock() @@ -60,22 +78,47 @@ func (t *timedcache) GetOrCreate(key string, createFunc func() interface{}) (int return nil, err } if exists { - return (entry.(*timedcacheEntry)).data, nil + return entry.(*cacheEntry), nil } - if createFunc == nil { - return nil, nil - } - created := createFunc() - t.store.Add(&timedcacheEntry{ + // Still not found, add new entry with nil data. + // Note the data will be filled later by getter. + newEntry := &cacheEntry{ key: key, - data: created, - }) - return created, nil + data: nil, + } + t.store.Add(newEntry) + return newEntry, nil } -func (t *timedcache) Delete(key string) { - _ = t.store.Delete(&timedcacheEntry{ +// Get returns the requested item by key. +func (t *timedCache) Get(key string) (interface{}, error) { + entry, err := t.getInternal(key) + if err != nil { + return nil, err + } + + // Data is still not cached yet, cache it by getter. + if entry.data == nil { + entry.lock.Lock() + defer entry.lock.Unlock() + + if entry.data == nil { + data, err := t.getter(key) + if err != nil { + return nil, err + } + + entry.data = data + } + } + + return entry.data, nil +} + +// Delete removes an item from the cache. +func (t *timedCache) Delete(key string) error { + return t.store.Delete(&cacheEntry{ key: key, }) } diff --git a/pkg/cloudprovider/providers/azure/azure_cache_test.go b/pkg/cloudprovider/providers/azure/azure_cache_test.go index 0ac26d2e98a..574d2cb119b 100644 --- a/pkg/cloudprovider/providers/azure/azure_cache_test.go +++ b/pkg/cloudprovider/providers/azure/azure_cache_test.go @@ -17,80 +17,144 @@ limitations under the License. package azure import ( - "sync/atomic" + "fmt" + "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) -func TestCacheReturnsSameObject(t *testing.T) { - type cacheTestingStruct struct{} - c := newTimedcache(1 * time.Minute) - o1 := cacheTestingStruct{} - get1, _ := c.GetOrCreate("b1", func() interface{} { - return o1 - }) - o2 := cacheTestingStruct{} - get2, _ := c.GetOrCreate("b1", func() interface{} { - return o2 - }) - if get1 != get2 { - t.Error("Get not equal") +var ( + fakeCacheTTL = 2 * time.Second +) + +type fakeDataObj struct{} + +type fakeDataSource struct { + called int + data map[string]*fakeDataObj + lock sync.Mutex +} + +func (fake *fakeDataSource) get(key string) (interface{}, error) { + fake.lock.Lock() + defer fake.lock.Unlock() + + fake.called = fake.called + 1 + if v, ok := fake.data[key]; ok { + return v, nil + } + + return nil, nil +} + +func (fake *fakeDataSource) set(data map[string]*fakeDataObj) { + fake.lock.Lock() + defer fake.lock.Unlock() + + fake.data = data + fake.called = 0 +} + +func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) { + dataSource := &fakeDataSource{ + data: make(map[string]*fakeDataObj), + } + getter := dataSource.get + cache, err := newTimedcache(fakeCacheTTL, getter) + assert.NoError(t, err) + return dataSource, cache +} + +func TestCacheGet(t *testing.T) { + val := &fakeDataObj{} + cases := []struct { + name string + data map[string]*fakeDataObj + key string + expected interface{} + }{ + { + name: "cache should return nil for empty data source", + key: "key1", + expected: nil, + }, + { + name: "cache should return nil for non exist key", + data: map[string]*fakeDataObj{"key2": val}, + key: "key1", + expected: nil, + }, + { + name: "cache should return data for existing key", + data: map[string]*fakeDataObj{"key1": val}, + key: "key1", + expected: val, + }, + } + + for _, c := range cases { + dataSource, cache := newFakeCache(t) + dataSource.set(c.data) + val, err := cache.Get(c.key) + assert.NoError(t, err, c.name) + assert.Equal(t, c.expected, val, c.name) } } -func TestCacheCallsCreateFuncOnce(t *testing.T) { - var callsCount uint32 - f1 := func() interface{} { - atomic.AddUint32(&callsCount, 1) - return 1 - } - c := newTimedcache(500 * time.Millisecond) - for index := 0; index < 20; index++ { - _, _ = c.GetOrCreate("b1", f1) +func TestCacheGetError(t *testing.T) { + getError := fmt.Errorf("getError") + getter := func(key string) (interface{}, error) { + return nil, getError } + cache, err := newTimedcache(fakeCacheTTL, getter) + assert.NoError(t, err) - if callsCount != 1 { - t.Error("Count not match") - } - time.Sleep(500 * time.Millisecond) - c.GetOrCreate("b1", f1) - if callsCount != 2 { - t.Error("Count not match") - } -} - -func TestCacheExpires(t *testing.T) { - f1 := func() interface{} { - return 1 - } - c := newTimedcache(500 * time.Millisecond) - get1, _ := c.GetOrCreate("b1", f1) - if get1 != 1 { - t.Error("Value not equal") - } - time.Sleep(500 * time.Millisecond) - get1, _ = c.GetOrCreate("b1", nil) - if get1 != nil { - t.Error("value not expired") - } + val, err := cache.Get("key") + assert.Error(t, err) + assert.Equal(t, getError, err) + assert.Nil(t, val) } func TestCacheDelete(t *testing.T) { - f1 := func() interface{} { - return 1 - } - c := newTimedcache(500 * time.Millisecond) - get1, _ := c.GetOrCreate("b1", f1) - if get1 != 1 { - t.Error("Value not equal") - } - get1, _ = c.GetOrCreate("b1", nil) - if get1 != 1 { - t.Error("Value not equal") - } - c.Delete("b1") - get1, _ = c.GetOrCreate("b1", nil) - if get1 != nil { - t.Error("value not deleted") + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, val, v, "cache should get correct data") + + dataSource.set(nil) + cache.Delete(key) + v, err = cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, nil, v, "cache should get nil after data is removed") +} + +func TestCacheExpired(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") + + time.Sleep(fakeCacheTTL) + v, err = cache.Get(key) + assert.NoError(t, err) + assert.Equal(t, 2, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data even after expired") } diff --git a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go index 463cd04f4a8..e7d70687411 100644 --- a/pkg/cloudprovider/providers/azure/azure_controllerCommon.go +++ b/pkg/cloudprovider/providers/azure/azure_controllerCommon.go @@ -124,7 +124,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri } else { glog.V(4).Info("azureDisk - azure attach succeeded") // Invalidate the cache right after updating - vmCache.Delete(vmName) + c.cloud.vmCache.Delete(vmName) } return err } @@ -183,7 +183,7 @@ func (c *controllerCommon) DetachDiskByName(diskName, diskURI string, nodeName t } else { glog.V(4).Info("azureDisk - azure disk detach succeeded") // Invalidate the cache right after updating - vmCache.Delete(vmName) + c.cloud.vmCache.Delete(vmName) } return err } diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 8b4387cf63a..65b6f9c6287 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -819,7 +819,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, ports = []v1.ServicePort{} } - sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") + sg, err := az.getSecurityGroup() if err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index e08ca912581..c78de23d730 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -98,10 +98,9 @@ func (az *Cloud) createRouteTable() error { return err } - glog.V(10).Infof("RouteTablesClient.Get(%q): start", az.RouteTableName) - _, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") - glog.V(10).Infof("RouteTablesClient.Get(%q): end", az.RouteTableName) - return err + // Invalidate the cache right after updating + az.rtCache.Delete(az.RouteTableName) + return nil } // CreateRoute creates the described managed route diff --git a/pkg/cloudprovider/providers/azure/azure_routes_test.go b/pkg/cloudprovider/providers/azure/azure_routes_test.go index 02041f0fa22..31f31ee4eb4 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes_test.go +++ b/pkg/cloudprovider/providers/azure/azure_routes_test.go @@ -37,6 +37,7 @@ func TestCreateRoute(t *testing.T) { Location: "location", }, } + cloud.rtCache, _ = cloud.newRouteTableCache() expectedTable := network.RouteTable{ Name: &cloud.RouteTableName, Location: &cloud.Location, diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 21fcf0bbe94..e638e4522d4 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -878,6 +878,10 @@ func getTestCloud() (az *Cloud) { az.VirtualMachineScaleSetVMsClient = newFakeVirtualMachineScaleSetVMsClient() az.VirtualMachinesClient = newFakeAzureVirtualMachinesClient() az.vmSet = newAvailabilitySet(az) + az.vmCache, _ = az.newVMCache() + az.lbCache, _ = az.newLBCache() + az.nsgCache, _ = az.newNSGCache() + az.rtCache, _ = az.newRouteTableCache() return az } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index e37ac8ca950..4c295c1a7cb 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -17,19 +17,25 @@ limitations under the License. package azure import ( + "fmt" "net/http" - "sync" "time" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" "github.com/Azure/go-autorest/autorest" - "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider" ) +var ( + vmCacheTTL = time.Minute + lbCacheTTL = 2 * time.Minute + nsgCacheTTL = 2 * time.Minute + rtCacheTTL = 2 * time.Minute +) + // checkExistsFromError inspects an error and returns a true if err is nil, // false if error is an autorest.Error with StatusCode=404 and will return the // error back if error is another status code or another type of error. @@ -60,75 +66,34 @@ func ignoreStatusNotFoundFromError(err error) error { return err } -// cache used by getVirtualMachine -// 15s for expiration duration -var vmCache = newTimedcache(15 * time.Second) - -type vmRequest struct { - lock *sync.Mutex - vm *compute.VirtualMachine -} - /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// resource request in short period. func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, err error) { vmName := string(nodeName) - - cachedRequest, err := vmCache.GetOrCreate(vmName, func() interface{} { - return &vmRequest{ - lock: &sync.Mutex{}, - vm: nil, - } - }) + cachedVM, err := az.vmCache.Get(vmName) if err != nil { - return compute.VirtualMachine{}, err - } - request := cachedRequest.(*vmRequest) - - if request.vm == nil { - request.lock.Lock() - defer request.lock.Unlock() - if request.vm == nil { - // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView - // request. If we first send an InstanceView request and then a non InstanceView request, the second - // request will still hit throttling. This is what happens now for cloud controller manager: In this - // case we do get instance view every time to fulfill the azure_zones requirement without hitting - // throttling. - // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed - vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, vmName, compute.InstanceView) - exists, realErr := checkResourceExistsFromError(err) - if realErr != nil { - return vm, realErr - } - - if !exists { - return vm, cloudprovider.InstanceNotFound - } - - request.vm = &vm - } - return *request.vm, nil + return vm, err } - glog.V(6).Infof("getVirtualMachine hits cache for(%s)", vmName) - return *request.vm, nil + if cachedVM == nil { + return vm, cloudprovider.InstanceNotFound + } + + return *(cachedVM.(*compute.VirtualMachine)), nil } func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { - var realErr error - - routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return routeTable, false, realErr + cachedRt, err := az.rtCache.Get(az.RouteTableName) + if err != nil { + return routeTable, false, err } - if !exists { + if cachedRt == nil { return routeTable, false, nil } - return routeTable, exists, err + return *(cachedRt.(*network.RouteTable)), true, nil } func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (pip network.PublicIPAddress, exists bool, err error) { @@ -175,17 +140,105 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet } func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { - var realErr error - - lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "") - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return lb, false, realErr + cachedLB, err := az.lbCache.Get(name) + if err != nil { + return lb, false, err } - if !exists { + if cachedLB == nil { return lb, false, nil } - return lb, exists, err + return *(cachedLB.(*network.LoadBalancer)), true, nil +} + +func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { + securityGroup, err := az.nsgCache.Get(az.SecurityGroupName) + if err != nil { + return nsg, err + } + + if securityGroup == nil { + return nsg, fmt.Errorf("nsg %q not found", az.SecurityGroupName) + } + + return *(securityGroup.(*network.SecurityGroup)), nil +} + +func (az *Cloud) newVMCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + // Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView + // request. If we first send an InstanceView request and then a non InstanceView request, the second + // request will still hit throttling. This is what happens now for cloud controller manager: In this + // case we do get instance view every time to fulfill the azure_zones requirement without hitting + // throttling. + // Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed + vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, key, compute.InstanceView) + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &vm, nil + } + + return newTimedcache(vmCacheTTL, getter) +} + +func (az *Cloud) newLBCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + lb, err := az.LoadBalancerClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &lb, nil + } + + return newTimedcache(lbCacheTTL, getter) +} + +func (az *Cloud) newNSGCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + nsg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &nsg, nil + } + + return newTimedcache(nsgCacheTTL, getter) +} + +func (az *Cloud) newRouteTableCache() (*timedCache, error) { + getter := func(key string) (interface{}, error) { + rt, err := az.RouteTablesClient.Get(az.ResourceGroup, key, "") + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return nil, realErr + } + + if !exists { + return nil, nil + } + + return &rt, nil + } + + return newTimedcache(rtCacheTTL, getter) }