diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go index eeefe41c005..a45905f7a30 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_backoff.go @@ -65,11 +65,11 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) { } // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry -func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.VirtualMachine, error) { +func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) { var machine compute.VirtualMachine var retryErr error err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { - machine, retryErr = az.getVirtualMachine(name) + machine, retryErr = az.getVirtualMachine(name, crt) if retryErr == cloudprovider.InstanceNotFound { return true, cloudprovider.InstanceNotFound } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go index 90bc7e163ad..f5216e61ae3 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go @@ -26,6 +26,20 @@ import ( "k8s.io/client-go/tools/cache" ) +// cacheReadType defines the read type for cache data +type cacheReadType int + +const ( + // cachedData returns data from cache if cache entry not expired + // if cache entry expired, then it will refetch the data using getter + // save the entry in cache and then return + cachedData cacheReadType = iota + // allowUnsafeRead returns data from cache even if the cache entry is + // active/expired. If entry doesn't exist in cache, then data is fetched + // using getter, saved in cache and returned + allowUnsafeRead +) + // getFunc defines a getter function for timedCache. type getFunc func(key string) (interface{}, error) @@ -36,6 +50,8 @@ type cacheEntry struct { // The lock to ensure not updating same entry simultaneously. lock sync.Mutex + // time when entry was fetched and created + createdOn time.Time } // cacheKeyFunc defines the key function required in TTLStore. @@ -48,6 +64,7 @@ type timedCache struct { store cache.Store lock sync.Mutex getter getFunc + ttl time.Duration } // newTimedcache creates a new timedCache. @@ -58,7 +75,11 @@ func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { return &timedCache{ getter: getter, - store: cache.NewTTLStore(cacheKeyFunc, ttl), + // switch to using NewStore instead of NewTTLStore so that we can + // reuse entries for calls that are fine with reading expired/stalled data. + // with NewTTLStore, entries are not returned if they have already expired. + store: cache.NewStore(cacheKeyFunc), + ttl: ttl, }, nil } @@ -69,16 +90,12 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) { if err != nil { return nil, err } - if exists { - return entry.(*cacheEntry), nil - } - + // lock here to ensure if entry doesn't exist, we add a new entry + // avoiding overwrites t.lock.Lock() defer t.lock.Unlock() - entry, exists, err = t.store.GetByKey(key) - if err != nil { - return nil, err - } + + // if entry exists, return the entry if exists { return entry.(*cacheEntry), nil } @@ -94,26 +111,38 @@ func (t *timedCache) getInternal(key string) (*cacheEntry, error) { } // Get returns the requested item by key. -func (t *timedCache) Get(key string) (interface{}, error) { +func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) { entry, err := t.getInternal(key) if err != nil { return nil, err } - // Data is still not cached yet, cache it by getter. - if entry.data == nil { - entry.lock.Lock() - defer entry.lock.Unlock() + entry.lock.Lock() + defer entry.lock.Unlock() - if entry.data == nil { - data, err := t.getter(key) - if err != nil { - return nil, err - } - - entry.data = data + // entry exists + if entry.data != nil { + // allow dirty, so return data even if expired + if crt == allowUnsafeRead { + return entry.data, nil + } + // if cached data is not expired, return cached data + if time.Since(entry.createdOn) < t.ttl && crt == cachedData { + return entry.data, nil } } + // Data is not cached yet or cache data is expired, cache it by getter. + // entry is locked before getting to ensure concurrent gets don't result in + // multiple ARM calls. + data, err := t.getter(key) + if err != nil { + return nil, err + } + + // set the data in cache and also set the last update time + // to now as the data was recently fetched + entry.data = data + entry.createdOn = time.Now().UTC() return entry.data, nil } @@ -129,7 +158,8 @@ func (t *timedCache) Delete(key string) error { // It is only used for testing. func (t *timedCache) Set(key string, data interface{}) { t.store.Add(&cacheEntry{ - key: key, - data: data, + key: key, + data: data, + createdOn: time.Now().UTC(), }) } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go index 12a33a14125..af82027bda0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go @@ -99,7 +99,7 @@ func TestCacheGet(t *testing.T) { for _, c := range cases { dataSource, cache := newFakeCache(t) dataSource.set(c.data) - val, err := cache.Get(c.key) + val, err := cache.Get(c.key, cachedData) assert.NoError(t, err, c.name) assert.Equal(t, c.expected, val, c.name) } @@ -113,7 +113,7 @@ func TestCacheGetError(t *testing.T) { cache, err := newTimedcache(fakeCacheTTL, getter) assert.NoError(t, err) - val, err := cache.Get("key") + val, err := cache.Get("key", cachedData) assert.Error(t, err) assert.Equal(t, getError, err) assert.Nil(t, val) @@ -128,13 +128,13 @@ func TestCacheDelete(t *testing.T) { dataSource, cache := newFakeCache(t) dataSource.set(data) - v, err := cache.Get(key) + v, err := cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, val, v, "cache should get correct data") dataSource.set(nil) cache.Delete(key) - v, err = cache.Get(key) + v, err = cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 1, dataSource.called) assert.Equal(t, nil, v, "cache should get nil after data is removed") @@ -149,14 +149,58 @@ func TestCacheExpired(t *testing.T) { dataSource, cache := newFakeCache(t) dataSource.set(data) - v, err := cache.Get(key) + v, err := cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 1, dataSource.called) assert.Equal(t, val, v, "cache should get correct data") time.Sleep(fakeCacheTTL) - v, err = cache.Get(key) + v, err = cache.Get(key, cachedData) assert.NoError(t, err) assert.Equal(t, 2, dataSource.called) assert.Equal(t, val, v, "cache should get correct data even after expired") } + +func TestCacheAllowUnsafeRead(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key, cachedData) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") + + time.Sleep(fakeCacheTTL) + v, err = cache.Get(key, allowUnsafeRead) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed") +} + +func TestCacheNoConcurrentGet(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + time.Sleep(fakeCacheTTL) + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go cache.Get(key, cachedData) + wg.Done() + } + v, err := cache.Get(key, cachedData) + wg.Wait() + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go index 39e7073fffa..e16539ae4c0 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_common.go @@ -69,7 +69,7 @@ type controllerCommon struct { } // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. -func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) { +func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) { // 1. vmType is standard, return cloud.vmSet directly. if c.cloud.VMType == vmTypeStandard { return c.cloud.vmSet, nil @@ -82,7 +82,7 @@ func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName) (VMSet, error) } // 3. If the node is managed by availability set, then return ss.availabilitySet. - managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName)) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(mapNodeNameToVMName(nodeName), crt) if err != nil { return nil, err } @@ -124,14 +124,14 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri } } - vmset, err := c.getNodeVMSet(nodeName) + vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead) if err != nil { return -1, err } instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName) if err != nil { - klog.Warningf("failed to get azure instance id (%v)", err) + klog.Warningf("failed to get azure instance id (%v) for node %s", err, nodeName) return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) } @@ -162,7 +162,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) } - vmset, err := c.getNodeVMSet(nodeName) + vmset, err := c.getNodeVMSet(nodeName, allowUnsafeRead) if err != nil { return err } @@ -197,18 +197,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N } // getNodeDataDisks invokes vmSet interfaces to get data disks for the node. -func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - vmset, err := c.getNodeVMSet(nodeName) +func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + vmset, err := c.getNodeVMSet(nodeName, crt) if err != nil { return nil, err } - return vmset.GetDataDisks(nodeName) + return vmset.GetDataDisks(nodeName, crt) } // GetDiskLun finds the lun on the host that the vhd is attached to, given a vhd's diskName and diskURI. func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { - disks, err := c.getNodeDataDisks(nodeName) + // getNodeDataDisks need to fetch the cached data/fresh data if cache expired here + // to ensure we get LUN based on latest entry. + disks, err := c.getNodeDataDisks(nodeName, cachedData) if err != nil { klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) return -1, err @@ -228,7 +230,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { - disks, err := c.getNodeDataDisks(nodeName) + disks, err := c.getNodeDataDisks(nodeName, cachedData) if err != nil { klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) return -1, err @@ -255,7 +257,11 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N attached[diskName] = false } - disks, err := c.getNodeDataDisks(nodeName) + // doing stalled read for getNodeDataDisks to ensure we don't call ARM + // for every reconcile call. The cache is invalidated after Attach/Detach + // disk. So the new entry will be fetched and cached the first time reconcile + // loop runs after the Attach/Disk OP which will reflect the latest model. + disks, err := c.getNodeDataDisks(nodeName, allowUnsafeRead) if err != nil { if err == cloudprovider.InstanceNotFound { // if host doesn't exist, no need to detach diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go index 7490d93fcf4..7cdf980b2fa 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard.go @@ -31,7 +31,7 @@ import ( // AttachDisk attaches a vhd to vm // the vhd must exist, can be identified by diskName, diskURI, and lun. func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { - vm, err := as.getVirtualMachine(nodeName) + vm, err := as.getVirtualMachine(nodeName, cachedData) if err != nil { return err } @@ -102,7 +102,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri // DetachDisk detaches a disk from host // the vhd can be identified by diskName or diskURI func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { - vm, err := as.getVirtualMachine(nodeName) + vm, err := as.getVirtualMachine(nodeName, cachedData) if err != nil { // if host doesn't exist, no need to detach klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) @@ -155,8 +155,8 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N } // GetDataDisks gets a list of data disks attached to the node. -func (as *availabilitySet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - vm, err := as.getVirtualMachine(nodeName) +func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + vm, err := as.getVirtualMachine(nodeName, crt) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go index 630a6eb9473..f8d7c15381c 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_standard_test.go @@ -20,6 +20,7 @@ package azure import ( "testing" + "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" "github.com/Azure/go-autorest/autorest/to" @@ -99,12 +100,14 @@ func TestGetDataDisks(t *testing.T) { nodeName types.NodeName expectedDataDisks []compute.DataDisk expectedError bool + crt cacheReadType }{ { desc: "an error shall be returned if there's no corresponding vm", nodeName: "vm2", expectedDataDisks: nil, expectedError: true, + crt: cachedData, }, { desc: "correct list of data disks shall be returned if everything is good", @@ -116,6 +119,19 @@ func TestGetDataDisks(t *testing.T) { }, }, expectedError: false, + crt: cachedData, + }, + { + desc: "correct list of data disks shall be returned if everything is good", + nodeName: "vm1", + expectedDataDisks: []compute.DataDisk{ + { + Lun: to.Int32Ptr(0), + Name: to.StringPtr("disk1"), + }, + }, + expectedError: false, + crt: allowUnsafeRead, }, } for i, test := range testCases { @@ -123,8 +139,15 @@ func TestGetDataDisks(t *testing.T) { vmSet := testCloud.vmSet setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false) - dataDisks, err := vmSet.GetDataDisks(test.nodeName) + dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) + + if test.crt == allowUnsafeRead { + time.Sleep(fakeCacheTTL) + dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) + assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) + assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) + } } } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go index 0d8468dfd7d..4e899ef1c1b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go @@ -32,7 +32,7 @@ import ( // the vhd must exist, can be identified by diskName, diskURI, and lun. func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error { vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return err } @@ -109,7 +109,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod // the vhd can be identified by diskName or diskURI func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) { vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return nil, err } @@ -167,8 +167,8 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName } // GetDataDisks gets a list of data disks attached to the node. -func (ss *scaleSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { - _, _, vm, err := ss.getVmssVM(string(nodeName)) +func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { + _, _, vm, err := ss.getVmssVM(string(nodeName), crt) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go index f4df42ad6c6..30799917835 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_fakes.go @@ -950,7 +950,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName return nil, fmt.Errorf("unimplemented") } -func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) { +func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { return nil, fmt.Errorf("unimplemented") } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go index 187e77f679f..40df24eab86 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_instance_metadata.go @@ -145,7 +145,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{} // GetMetadata gets instance metadata from cache. func (ims *InstanceMetadataService) GetMetadata() (*InstanceMetadata, error) { - cache, err := ims.imsCache.Get(metadataCacheKey) + cache, err := ims.imsCache.Get(metadataCacheKey, cachedData) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go index 05aa5adfa4d..7486d7c6eca 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go @@ -962,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, if isInternal { // Refresh updated lb which will be used later in other places. - newLB, exist, err := az.getAzureLoadBalancer(lbName) + newLB, exist, err := az.getAzureLoadBalancer(lbName, cachedData) if err != nil { klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) return nil, err @@ -1125,7 +1125,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, ports = []v1.ServicePort{} } - sg, err := az.getSecurityGroup() + sg, err := az.getSecurityGroup(cachedData) if err != nil { return nil, err } @@ -1466,7 +1466,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa } if lbName != "" { - loadBalancer, _, err := az.getAzureLoadBalancer(lbName) + loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cachedData) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go index 3cc689fa597..9077eb5ad5b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go @@ -46,7 +46,7 @@ const ( // ListRoutes lists all managed routes that belong to the specified clusterName func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) - routeTable, existsRouteTable, err := az.getRouteTable() + routeTable, existsRouteTable, err := az.getRouteTable(cachedData) routes, err := processRoutes(routeTable, existsRouteTable, err) if err != nil { return nil, err @@ -102,7 +102,7 @@ func processRoutes(routeTable network.RouteTable, exists bool, err error) ([]*cl } func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error { - if _, existsRouteTable, err := az.getRouteTable(); err != nil { + if _, existsRouteTable, err := az.getRouteTable(cachedData); err != nil { klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) return err } else if existsRouteTable { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go index 444584edf37..b4f716160ba 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_standard.go @@ -375,14 +375,14 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) var machine compute.VirtualMachine var err error - machine, err = as.getVirtualMachine(types.NodeName(name)) + machine, err = as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err == cloudprovider.InstanceNotFound { return "", cloudprovider.InstanceNotFound } if err != nil { if as.CloudProviderBackoff { klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) - machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name)) + machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), allowUnsafeRead) if err != nil { klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) return "", err @@ -403,7 +403,7 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) // GetPowerStatusByNodeName returns the power state of the specified node. func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { - vm, err := as.getVirtualMachine(types.NodeName(name)) + vm, err := as.getVirtualMachine(types.NodeName(name), cachedData) if err != nil { return powerState, err } @@ -436,7 +436,7 @@ func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.Nod // GetInstanceTypeByNodeName gets the instance type by node name. func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { - machine, err := as.getVirtualMachine(types.NodeName(name)) + machine, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err != nil { klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) return "", err @@ -448,7 +448,7 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error // GetZoneByNodeName gets availability zone for the specified node. If the node is not running // with availability zone, then it returns fault domain. func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - vm, err := as.getVirtualMachine(types.NodeName(name)) + vm, err := as.getVirtualMachine(types.NodeName(name), allowUnsafeRead) if err != nil { return cloudprovider.Zone{}, err } @@ -649,7 +649,7 @@ func extractResourceGroupByNicID(nicID string) (string, error) { func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { var machine compute.VirtualMachine - machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName)) + machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), cachedData) if err != nil { klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) return network.Interface{}, err diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go index 26b6e1292c8..2d7347ae113 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmsets.go @@ -70,7 +70,7 @@ type VMSet interface { // DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) // GetDataDisks gets a list of data disks attached to the node. - GetDataDisks(nodeName types.NodeName) ([]compute.DataDisk, error) + GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) // GetPowerStatusByNodeName returns the power state of the specified node. GetPowerStatusByNodeName(name string) (string, error) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 5a4b657cad9..ffff81d8922 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -88,9 +88,9 @@ func newScaleSet(az *Cloud) (VMSet, error) { // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. -func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { +func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { return "", "", nil, err } @@ -132,7 +132,7 @@ func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.Virtual // GetPowerStatusByNodeName returns the power state of the specified node. func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, cachedData) if err != nil { return powerState, err } @@ -154,9 +154,9 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. -func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) { +func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { return nil, false, err } @@ -203,7 +203,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI // It must return ("", cloudprovider.InstanceNotFound) if the instance does // not exist or is no longer running. func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return "", err @@ -213,7 +213,7 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { return ss.availabilitySet.GetInstanceIDByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return "", err } @@ -247,7 +247,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, return ss.availabilitySet.GetNodeNameByProviderID(providerID) } - vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead) if err != nil { return "", err } @@ -262,7 +262,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, // GetInstanceTypeByNodeName gets the instance type by node name. func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return "", err @@ -272,7 +272,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { return ss.availabilitySet.GetInstanceTypeByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return "", err } @@ -287,7 +287,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { // GetZoneByNodeName gets availability zone for the specified node. If the node is not running // with availability zone, then it returns fault domain. func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(name) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, allowUnsafeRead) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return cloudprovider.Zone{}, err @@ -297,7 +297,7 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { return ss.availabilitySet.GetZoneByNodeName(name) } - _, _, vm, err := ss.getVmssVM(name) + _, _, vm, err := ss.getVmssVM(name, allowUnsafeRead) if err != nil { return cloudprovider.Zone{}, err } @@ -536,7 +536,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { } nodeName := nodes[nx].Name - ssName, _, _, err := ss.getVmssVM(nodeName) + ssName, _, _, err := ss.getVmssVM(nodeName, cachedData) if err != nil { return nil, err } @@ -614,7 +614,7 @@ func extractResourceGroupByVMSSNicID(nicID string) (string, error) { // GetPrimaryInterface gets machine primary network interface by node name and vmSet. func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { - managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, cachedData) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) return network.Interface{}, err @@ -624,7 +624,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err return ss.availabilitySet.GetPrimaryInterface(nodeName) } - ssName, instanceID, vm, err := ss.getVmssVM(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData) if err != nil { // VM is availability set, but not cached yet in availabilitySetNodesCache. if err == ErrorNotVmssInstance { @@ -747,7 +747,7 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) vmName := mapNodeNameToVMName(nodeName) - ssName, instanceID, vm, err := ss.getVmssVM(vmName) + ssName, instanceID, vm, err := ss.getVmssVM(vmName, cachedData) if err != nil { return err } @@ -1027,7 +1027,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac f := func() error { // Check whether the node is VMAS virtual machine. - managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName) + managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, cachedData) if err != nil { klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) return err @@ -1068,7 +1068,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { - ssName, instanceID, vm, err := ss.getVmssVM(nodeName) + ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cachedData) if err != nil { return err } @@ -1167,7 +1167,7 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st resourceGroup := matches[1] scaleSetName := matches[2] instanceID := matches[3] - vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID) + vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, allowUnsafeRead) if err != nil { return "", err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index 398dfed0ec5..633a9106245 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -47,6 +47,7 @@ type vmssVirtualMachinesEntry struct { vmssName string instanceID string virtualMachine *compute.VirtualMachineScaleSetVM + lastUpdate time.Time } func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string { @@ -101,6 +102,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { vmssName: ssName, instanceID: to.String(vm.InstanceID), virtualMachine: &vm, + lastUpdate: time.Now().UTC(), }) } } @@ -113,7 +115,7 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { } func (ss *scaleSet) deleteCacheForNode(nodeName string) error { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, allowUnsafeRead) if err != nil { return err } @@ -150,8 +152,8 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { return newTimedcache(availabilitySetNodesCacheTTL, getter) } -func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) { - cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey) +func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt cacheReadType) (bool, error) { + cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, crt) if err != nil { return false, err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go index c6596d7bc64..a5998e53dcf 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go @@ -85,7 +85,7 @@ func TestVMSSVMCache(t *testing.T) { for i := range virtualMachines { vm := virtualMachines[i] vmName := to.String(vm.OsProfile.ComputerName) - ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData) assert.NoError(t, err) assert.Equal(t, "vmss", ssName) assert.Equal(t, to.String(vm.InstanceID), instanceID) @@ -99,14 +99,14 @@ func TestVMSSVMCache(t *testing.T) { assert.NoError(t, err) // the VM should be removed from cache after deleteCacheForNode(). - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cachedData) assert.NoError(t, err) cachedVirtualMachines := cached.(*sync.Map) _, ok := cachedVirtualMachines.Load(vmName) assert.Equal(t, false, ok) // the VM should be get back after another cache refresh. - ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cachedData) assert.NoError(t, err) assert.Equal(t, "vmss", ssName) assert.Equal(t, to.String(vm.InstanceID), instanceID) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go index 5da34a1f407..7443f86c211 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_wrap.go @@ -90,9 +90,9 @@ func ignoreStatusForbiddenFromError(err error) error { /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache /// The service side has throttling control that delays responses if there're multiple requests onto certain vm /// resource request in short period. -func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachine, err error) { +func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (vm compute.VirtualMachine, err error) { vmName := string(nodeName) - cachedVM, err := az.vmCache.Get(vmName) + cachedVM, err := az.vmCache.Get(vmName, crt) if err != nil { return vm, err } @@ -104,8 +104,8 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM return *(cachedVM.(*compute.VirtualMachine)), nil } -func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { - cachedRt, err := az.rtCache.Get(az.RouteTableName) +func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable, exists bool, err error) { + cachedRt, err := az.rtCache.Get(az.RouteTableName, crt) if err != nil { return routeTable, false, err } @@ -168,8 +168,8 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet return subnet, exists, err } -func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { - cachedLB, err := az.lbCache.Get(name) +func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) { + cachedLB, err := az.lbCache.Get(name, crt) if err != nil { return lb, false, err } @@ -181,12 +181,12 @@ func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exi return *(cachedLB.(*network.LoadBalancer)), true, nil } -func (az *Cloud) getSecurityGroup() (nsg network.SecurityGroup, err error) { +func (az *Cloud) getSecurityGroup(crt cacheReadType) (nsg network.SecurityGroup, err error) { if az.SecurityGroupName == "" { return nsg, fmt.Errorf("securityGroupName is not configured") } - securityGroup, err := az.nsgCache.Get(az.SecurityGroupName) + securityGroup, err := az.nsgCache.Get(az.SecurityGroupName, crt) if err != nil { return nsg, err }