From cebdd8518655cf7c883a227037985f05fe809de2 Mon Sep 17 00:00:00 2001 From: Anish Ramasekar Date: Tue, 8 Oct 2019 13:13:36 -0700 Subject: [PATCH] add allowunsafe read update common controller add comment update to cached data add cacheReadType type update logic for disk reconcile loop to read from cache update delete cache for node review feedback --- .../azure/azure_backoff.go | 4 +- .../azure/azure_cache.go | 76 +++++++++++++------ .../azure/azure_cache_test.go | 56 ++++++++++++-- .../azure/azure_controller_common.go | 28 ++++--- .../azure/azure_controller_standard.go | 8 +- .../azure/azure_controller_standard_test.go | 25 +++++- .../azure/azure_controller_vmss.go | 8 +- .../azure/azure_fakes.go | 2 +- .../azure/azure_instance_metadata.go | 2 +- .../azure/azure_loadbalancer.go | 6 +- .../azure/azure_routes.go | 4 +- .../azure/azure_standard.go | 12 +-- .../azure/azure_vmsets.go | 2 +- .../azure/azure_vmss.go | 38 +++++----- .../azure/azure_vmss_cache.go | 8 +- .../azure/azure_vmss_cache_test.go | 6 +- .../azure/azure_wrap.go | 16 ++-- 17 files changed, 203 insertions(+), 98 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go index eeefe41c005..a45905f7a30 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go @@ -65,11 +65,11 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) { } // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry -func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, error) { +func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) { var machine compute.VirtualMachine var retryErr error err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { - machine, retryErr = az.getVirtualMachine(name) + machine, retryErr = az.getVirtualMachine(name, crt) if retryErr == cloudprovider.InstanceNotFound { return true, cloudprovider.InstanceNotFound } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go index 90bc7e163ad..f5216e61ae3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go @@ -26,6 +26,20 @@ import ( "k8s.io/client-go/tools/cache" ) +// cacheReadType defines the read type for cache data +type cacheReadType int + +const ( + // cachedData returns data from cache if cache entry not expired + // if cache entry expired, then it will refetch the data using getter + // save the entry in cache and then return + cachedData cacheReadType = iota + // allowUnsafeRead returns data from cache even if the cache entry is + // active/expired. If entry doesn't exist in cache, then data is fetched + // using getter, saved in cache and returned + allowUnsafeRead +) + // getFunc defines a getter function for timedCache. type getFunc func(key string) (interface{}, error) @@ -36,6 +50,8 @@ type cacheEntry struct { // The lock to ensure not updating same entry simultaneously. lock sync.Mutex + // time when entry was fetched and created + createdOn time.Time } // cacheKeyFunc defines the key function required in TTLStore. @@ -48,6 +64,7 @@ type timedCache struct { store cache.Store lock sync.Mutex getter getFunc + ttl time.Duration } // newTimedcache creates a new timedCache. @@ -58,7 +75,11 @@ func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { return &timedCache{ getter: getter, - store: cache.NewTTLStore(cacheKeyFunc, ttl), + // switch to using NewStore instead of NewTTLStore so that we can + // reuse entries for calls that are fine with reading expired/stalled data. + // with NewTTLStore, entries are not returned if they have already expired. + store: cache.NewStore(cacheKeyFunc), + ttl: ttl, }, nil } @@ -69,16 +90,12 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) { if err != nil { return nil, err } - if exists { - return entry.(*cacheEntry), nil - } - + // lock here to ensure if entry doesn't exist, we add a new entry + // avoiding overwrites t.lock.Lock() defer t.lock.Unlock() - entry, exists, err = t.store.GetByKey(key) - if err != nil { - return nil, err - } + + // if entry exists, return the entry if exists { return entry.(*cacheEntry), nil } @@ -94,26 +111,38 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) { } // Get returns the requested item by key. -func (t *timedCache) Get(key string) (interface{}, error) { +func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) { entry, err := t.getInternal(key) if err != nil { return nil, err } - // Data is still not cached yet, cache it by getter. - if entry.data == nil { - entry.lock.Lock() - defer entry.lock.Unlock() + entry.lock.Lock() + defer entry.lock.Unlock() - if entry.data == nil { - data, err := t.getter(key) - if err != nil { - return nil, err - } - - entry.data = data + // entry exists + if entry.data != nil { + // allow dirty, so return data even if expired + if crt == allowUnsafeRead { + return entry.data, nil + } + // if cached data is not expired, return cached data + if time.Since(entry.createdOn) < t.ttl && crt == cachedData { + return entry.data, nil } } + // Data is not cached yet or cache data is expired, cache it by getter. + // entry is locked before getting to ensure concurrent gets don't result in + // multiple ARM calls. + data, err := t.getter(key) + if err != nil { + return nil, err + } + + // set the data in cache and also set the last update time + // to now as the data was recently fetched + entry.data = data + entry.createdOn = time.Now().UTC() return entry.data, nil } @@ -129,7 +158,8 @@ func (t *timedCache) Delete(key string) error { // It is only used for testing. func (t *timedCache) Set(key string, data interface{}) { t.store.Add(&cacheEntry{ - key: key, - data: data, + key: key, + data: data, + createdOn: time.Now().UTC(), }) } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go index 12a33a14125..af82027bda0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go @@ -99,7 +99,7 @@ func TestCacheGet(t *testing.T) { for _, c := range cases { dataSource, cache := newFakeCache(t) dataSource.set(c.data) - val, err := cache.Get(c.key) + val, err := cache.Get(c.key, cachedData) assert.NoError(t, err, c.name) assert.Equal(t, c.expected, val, c.name) } @@ -113,7 +113,7 @@ func TestCacheGetError(t *testing.T) { cache, err := newTimedcache(fakeCacheTTL, getter) assert.NoError(t, err) - val, err := cache.Get("key") + val, err := cache.Get("key", cachedData) assert.Error(t, err) assert.Equal(t, getError, err) assert.Nil(t, val) @@ -128,13 +128,13 @@ func TestCacheDelete(t *testing.T) { dataSource, cache := newFakeCache(t) dataSource.set(data) - v, err := cache.Get(key) + v, err := cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, val, v, "cache should get correct data") dataSource.set(nil) cache.Delete(key) - v, err = cache.Get(key) + v, err = cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 1, dataSource.called) assert.Equal(t, nil, v, "cache should get nil after data is removed") @@ -149,14 +149,58 @@ func TestCacheExpired(t *testing.T) { dataSource, cache := newFakeCache(t) dataSource.set(data) - v, err := cache.Get(key) + v, err := cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 1, dataSource.called) assert.Equal(t, val, v, "cache should get correct data") time.Sleep(fakeCacheTTL) - v, err = cache.Get(key) + v, err = cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 2, dataSource.called) assert.Equal(t, val, v, "cache should get correct data even after expired") } + +func TestCacheAllowUnsafeRead(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key, cachedData) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") + + time.Sleep(fakeCacheTTL) + v, err = cache.Get(key, allowUnsafeRead) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed") +} + +func TestCacheNoConcurrentGet(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + time.Sleep(fakeCacheTTL) + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go cache.Get(key, cachedData) + wg.Done() + } + v, err := cache.Get(key, cachedData) + wg.Wait() + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go index 39e7073fffa..e16539ae4c0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go @@ -69,7 +69,7 @@ type controllerCommon struct { } // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. -func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) { +func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) { // 1. vmType is standard, return cloud.vmSet directly. if c.cloud.VMType == vmTypeStandard { return c.cloud.vmSet, nil @@ -82,7 +82,7 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) } // 3. If the node is managed by availability set, then return ss.availabilitySet. - managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName)) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName), crt) if err != nil { return nil, err } @@ -124,14 +124,14 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri } } - vmset, err := c.getNodeVMSet(nodeName) + vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead) if err != nil { return -1, err } instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) if err != nil { - klog.Warningf("failed to get azure instance id (%v)", err) + klog.Warningf("failed to get azure instance id (%v) for node %s", err, nodeName) return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) } @@ -162,7 +162,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) } - vmset, err := c.getNodeVMSet(nodeName) + vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead) if err != nil { return err } @@ -197,18 +197,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N } // getNodeDataDisks invokes vmSet interfaces to get data disks for the node. -func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - vmset, err := c.getNodeVMSet(nodeName) +func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + vmset, err := c.getNodeVMSet(nodeName, crt) if err != nil { return nil, err } - return vmset.GetDataDisks(nodeName) + return vmset.GetDataDisks(nodeName, crt) } // GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI. func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { - disks, err := c.getNodeDataDisks(nodeName) + // getNodeDataDisks need to fetch the cached data/fresh data if cache expired here + // to ensure we get LUN based on latest entry. + disks, err := c.getNodeDataDisks(nodeName, cachedData) if err != nil { klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) return -1, err @@ -228,7 +230,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { - disks, err := c.getNodeDataDisks(nodeName) + disks, err := c.getNodeDataDisks(nodeName, cachedData) if err != nil { klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) return -1, err @@ -255,7 +257,11 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N attached[diskName] = false } - disks, err := c.getNodeDataDisks(nodeName) + // doing stalled read for getNodeDataDisks to ensure we don't call ARM + // for every reconcile call. The cache is invalidated after Attach/Detach + // disk. So the new entry will be fetched and cached the first time reconcile + // loop runs after the Attach/Disk OP which will reflect the latest model. + disks, err := c.getNodeDataDisks(nodeName, allowUnsafeRead) if err != nil { if err == cloudprovider.InstanceNotFound { // if host doesn't exist, no need to detach diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go index 7490d93fcf4..7cdf980b2fa 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go @@ -31,7 +31,7 @@ import ( // AttachDisk attaches a vhd to vm // the vhd must exist, can be identified by diskName, diskURI, and lun. func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { - vm, err := as.getVirtualMachine(nodeName) + vm, err := as.getVirtualMachine(nodeName, cachedData) if err != nil { return err } @@ -102,7 +102,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri // DetachDisk detaches a disk from host // the vhd can be identified by diskName or diskURI func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { - vm, err := as.getVirtualMachine(nodeName) + vm, err := as.getVirtualMachine(nodeName, cachedData) if err != nil { // if host doesn't exist, no need to detach klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) @@ -155,8 +155,8 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N } // GetDataDisks gets a list of data disks attached to the node. -func (as *availabilitySet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - vm, err := as.getVirtualMachine(nodeName) +func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + vm, err := as.getVirtualMachine(nodeName, crt) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go index 630a6eb9473..f8d7c15381c 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go @@ -20,6 +20,7 @@ package azure import ( "testing" + "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" "github.com/Azure/go-autorest/autorest/to" @@ -99,12 +100,14 @@ func TestGetDataDisks(t *testing.T) { nodeName types.NodeName expectedDataDisks []compute.DataDisk expectedError bool + crt cacheReadType }{ { desc: "an error shall be returned if there's no corresponding vm", nodeName: "vm2", expectedDataDisks: nil, expectedError: true, + crt: cachedData, }, { desc: "correct list of data disks shall be returned if everything is good", @@ -116,6 +119,19 @@ func TestGetDataDisks(t *testing.T) { }, }, expectedError: false, + crt: cachedData, + }, + { + desc: "correct list of data disks shall be returned if everything is good", + nodeName: "vm1", + expectedDataDisks: []compute.DataDisk{ + { + Lun: to.Int32Ptr(0), + Name: to.StringPtr("disk1"), + }, + }, + expectedError: false, + crt: allowUnsafeRead, }, } for i, test := range testCases { @@ -123,8 +139,15 @@ func TestGetDataDisks(t *testing.T) { vmSet := testCloud.vmSet setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false) - dataDisks, err := vmSet.GetDataDisks(test.nodeName) + dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) + + if test.crt == allowUnsafeRead { + time.Sleep(fakeCacheTTL) + dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) + assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) + assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) + } } } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go index 0d8468dfd7d..4e899ef1c1b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go @@ -32,7 +32,7 @@ import ( // the vhd must exist, can be identified by diskName, diskURI, and lun. func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return err } @@ -109,7 +109,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod // the vhd can be identified by diskName or diskURI func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return nil, err } @@ -167,8 +167,8 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName } // GetDataDisks gets a list of data disks attached to the node. -func (ss *scaleSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - _, _, vm, err := ss.getVmssVM(string(nodeName)) +func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + _, _, vm, err := ss.getVmssVM(string(nodeName), crt) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go index f4df42ad6c6..30799917835 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go @@ -950,7 +950,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName return nil, fmt.Errorf("unimplemented") } -func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { +func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { return nil, fmt.Errorf("unimplemented") } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go index 187e77f679f..40df24eab86 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go @@ -145,7 +145,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{} // GetMetadata gets instance metadata from cache. func (ims *InstanceMetadataService) GetMetadata() (*InstanceMetadata, error) { - cache, err := ims.imsCache.Get(metadataCacheKey) + cache, err := ims.imsCache.Get(metadataCacheKey, cachedData) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go index 05aa5adfa4d..7486d7c6eca 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go @@ -962,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, if isInternal { // Refresh updated lb which will be used later in other places. - newLB, exist, err := az.getAzureLoadBalancer(lbName) + newLB, exist, err := az.getAzureLoadBalancer(lbName, cachedData) if err != nil { klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) return nil, err @@ -1125,7 +1125,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, ports = []v1.ServicePort{} } - sg, err := az.getSecurityGroup() + sg, err := az.getSecurityGroup(cachedData) if err != nil { return nil, err } @@ -1466,7 +1466,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa } if lbName != "" { - loadBalancer, _, err := az.getAzureLoadBalancer(lbName) + loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cachedData) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go index 3cc689fa597..9077eb5ad5b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go @@ -46,7 +46,7 @@ const ( // ListRoutes lists all managed routes that belong to the specified clusterName func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) - routeTable, existsRouteTable, err := az.getRouteTable() + routeTable, existsRouteTable, err := az.getRouteTable(cachedData) routes, err := processRoutes(routeTable, existsRouteTable, err) if err != nil { return nil, err @@ -102,7 +102,7 @@ func processRoutes(routeTable network.RouteTable, exists bool, err error) ([]*cl } func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error { - if _, existsRouteTable, err := az.getRouteTable(); err != nil { + if _, existsRouteTable, err := az.getRouteTable(cachedData); err != nil { klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) return err } else if existsRouteTable { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go index 444584edf37..b4f716160ba 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go @@ -375,14 +375,14 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) var machine compute.VirtualMachine var err error - machine, err = as.getVirtualMachine(types.NodeName(name)) + machine, err = as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err == cloudprovider.InstanceNotFound { return "", cloudprovider.InstanceNotFound } if err != nil { if as.CloudProviderBackoff { klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) - machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name)) + machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), allowUnsafeRead) if err != nil { klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) return "", err @@ -403,7 +403,7 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) // GetPowerStatusByNodeName returns the power state of the specified node. func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { - vm, err := as.getVirtualMachine(types.NodeName(name)) + vm, err := as.getVirtualMachine(types.NodeName(name), cachedData) if err != nil { return powerState, err } @@ -436,7 +436,7 @@ func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.Nod // GetInstanceTypeByNodeName gets the instance type by node name. func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { - machine, err := as.getVirtualMachine(types.NodeName(name)) + machine, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err != nil { klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) return "", err @@ -448,7 +448,7 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error // GetZoneByNodeName gets availability zone for the specified node. If the node is not running // with availability zone, then it returns fault domain. func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - vm, err := as.getVirtualMachine(types.NodeName(name)) + vm, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err != nil { return cloudprovider.Zone{}, err } @@ -649,7 +649,7 @@ func extractResourceGroupByNicID(nicID string) (string, error) { func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { var machine compute.VirtualMachine - machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName)) + machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), cachedData) if err != nil { klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) return network.Interface{}, err diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go index 26b6e1292c8..2d7347ae113 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go @@ -70,7 +70,7 @@ type VMSet interface { // DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) // GetDataDisks gets a list of data disks attached to the node. - GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) + GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) // GetPowerStatusByNodeName returns the power state of the specified node. GetPowerStatusByNodeName(name string) (string, error) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 5a4b657cad9..ffff81d8922 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -88,9 +88,9 @@ func newScaleSet(az *Cloud) (VMSet, error) { // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. -func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { +func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { return "", "", nil, err } @@ -132,7 +132,7 @@ func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.Virtual // GetPowerStatusByNodeName returns the power state of the specified node. func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, cachedData) if err != nil { return powerState, err } @@ -154,9 +154,9 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. -func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) { +func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { return nil, false, err } @@ -203,7 +203,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI // It must return ("", cloudprovider.InstanceNotFound) if the instance does // not exist or is no longer running. func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return "", err @@ -213,7 +213,7 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { return ss.availabilitySet.GetInstanceIDByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return "", err } @@ -247,7 +247,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, return ss.availabilitySet.GetNodeNameByProviderID(providerID) } - vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead) if err != nil { return "", err } @@ -262,7 +262,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, // GetInstanceTypeByNodeName gets the instance type by node name. func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return "", err @@ -272,7 +272,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { return ss.availabilitySet.GetInstanceTypeByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return "", err } @@ -287,7 +287,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { // GetZoneByNodeName gets availability zone for the specified node. If the node is not running // with availability zone, then it returns fault domain. func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return cloudprovider.Zone{}, err @@ -297,7 +297,7 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { return ss.availabilitySet.GetZoneByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return cloudprovider.Zone{}, err } @@ -536,7 +536,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { } nodeName := nodes[nx].Name - ssName, _, _, err := ss.getVmssVM(nodeName) + ssName, _, _, err := ss.getVmssVM(nodeName, cachedData) if err != nil { return nil, err } @@ -614,7 +614,7 @@ func extractResourceGroupByVMSSNicID(nicID string) (string, error) { // GetPrimaryInterface gets machine primary network interface by node name and vmSet. func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, cachedData) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return network.Interface{}, err @@ -624,7 +624,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err return ss.availabilitySet.GetPrimaryInterface(nodeName) } - ssName, instanceID, vm, err := ss.getVmssVM(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData) if err != nil { // VM is availability set, but not cached yet in availabilitySetNodesCache. if err == ErrorNotVmssInstance { @@ -747,7 +747,7 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return err } @@ -1027,7 +1027,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac f := func() error { // Check whether the node is VMAS virtual machine. - managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, cachedData) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) return err @@ -1068,7 +1068,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { - ssName, instanceID, vm, err := ss.getVmssVM(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData) if err != nil { return err } @@ -1167,7 +1167,7 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st resourceGroup := matches[1] scaleSetName := matches[2] instanceID := matches[3] - vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead) if err != nil { return "", err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index 398dfed0ec5..633a9106245 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -47,6 +47,7 @@ type vmssVirtualMachinesEntry struct { vmssName string instanceID string virtualMachine *compute.VirtualMachineScaleSetVM + lastUpdate time.Time } func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string { @@ -101,6 +102,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { vmssName: ssName, instanceID: to.String(vm.InstanceID), virtualMachine: &vm, + lastUpdate: time.Now().UTC(), }) } } @@ -113,7 +115,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { } func (ss *scaleSet) deleteCacheForNode(nodeName string) error { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, allowUnsafeRead) if err != nil { return err } @@ -150,8 +152,8 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { return newTimedcache(availabilitySetNodesCacheTTL, getter) } -func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) { - cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey) +func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt cacheReadType) (bool, error) { + cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, crt) if err != nil { return false, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go index c6596d7bc64..a5998e53dcf 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go @@ -85,7 +85,7 @@ func TestVMSSVMCache(t *testing.T) { for i := range virtualMachines { vm := virtualMachines[i] vmName := to.String(vm.OsProfile.ComputerName) - ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData) assert.NoError(t, err) assert.Equal(t, "vmss", ssName) assert.Equal(t, to.String(vm.InstanceID), instanceID) @@ -99,14 +99,14 @@ func TestVMSSVMCache(t *testing.T) { assert.NoError(t, err) // the VM should be removed from cache after deleteCacheForNode(). - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cachedData) assert.NoError(t, err) cachedVirtualMachines := cached.(*sync.Map) _, ok := cachedVirtualMachines.Load(vmName) assert.Equal(t, false, ok) // the VM should be get back after another cache refresh. - ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData) assert.NoError(t, err) assert.Equal(t, "vmss", ssName) assert.Equal(t, to.String(vm.InstanceID), instanceID) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go index 5da34a1f407..7443f86c211 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go @@ -90,9 +90,9 @@ func ignoreStatusForbiddenFromError(err error) error { /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// resource request in short period. -func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, err error) { +func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (vm compute.VirtualMachine, err error) { vmName := string(nodeName) - cachedVM, err := az.vmCache.Get(vmName) + cachedVM, err := az.vmCache.Get(vmName, crt) if err != nil { return vm, err } @@ -104,8 +104,8 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM return *(cachedVM.(*compute.VirtualMachine)), nil } -func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { - cachedRt, err := az.rtCache.Get(az.RouteTableName) +func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable, exists bool, err error) { + cachedRt, err := az.rtCache.Get(az.RouteTableName, crt) if err != nil { return routeTable, false, err } @@ -168,8 +168,8 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet return subnet, exists, err } -func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { - cachedLB, err := az.lbCache.Get(name) +func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) { + cachedLB, err := az.lbCache.Get(name, crt) if err != nil { return lb, false, err } @@ -181,12 +181,12 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi return *(cachedLB.(*network.LoadBalancer)), true, nil } -func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { +func (az *Cloud) getSecurityGroup(crt cacheReadType) (nsg network.SecurityGroup, err error) { if az.SecurityGroupName == "" { return nsg, fmt.Errorf("securityGroupName is not configured") } - securityGroup, err := az.nsgCache.Get(az.SecurityGroupName) + securityGroup, err := az.nsgCache.Get(az.SecurityGroupName, crt) if err != nil { return nsg, err }