diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go index 38f21378b49..3ee1ccbf4b9 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache.go @@ -38,6 +38,9 @@ const ( // active/expired. If entry doesn't exist in cache, then data is fetched // using getter, saved in cache and returned cacheReadTypeUnsafe + // cacheReadTypeForceRefresh force refreshes the cache even if the cache entry + // is not expired + cacheReadTypeForceRefresh ) // getFunc defines a getter function for timedCache. @@ -120,20 +123,20 @@ func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) { entry.lock.Lock() defer entry.lock.Unlock() - // entry exists - if entry.data != nil { + // entry exists and if cache is not force refreshed + if entry.data != nil && crt != cacheReadTypeForceRefresh { // allow unsafe read, so return data even if expired if crt == cacheReadTypeUnsafe { return entry.data, nil } // if cached data is not expired, return cached data - if time.Since(entry.createdOn) < t.ttl { + if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl { return entry.data, nil } } - // Data is not cached yet or cache data is expired, cache it by getter. - // entry is locked before getting to ensure concurrent gets don't result in - // multiple ARM calls. + // Data is not cached yet, cache data is expired or requested force refresh + // cache it by getter. entry is locked before getting to ensure concurrent + // gets don't result in multiple ARM calls. data, err := t.getter(key) if err != nil { return nil, err diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go index 08b8f3d48d7..d50f34fbc71 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_cache_test.go @@ -204,3 +204,23 @@ func TestCacheNoConcurrentGet(t *testing.T) { assert.Equal(t, 1, dataSource.called) assert.Equal(t, val, v, "cache should get correct data") } + +func TestCacheForceRefresh(t *testing.T) { + key := "key1" + val := &fakeDataObj{} + data := map[string]*fakeDataObj{ + key: val, + } + dataSource, cache := newFakeCache(t) + dataSource.set(data) + + v, err := cache.Get(key, cacheReadTypeDefault) + assert.NoError(t, err) + assert.Equal(t, 1, dataSource.called) + assert.Equal(t, val, v, "cache should get correct data") + + v, err = cache.Get(key, cacheReadTypeForceRefresh) + assert.NoError(t, err) + assert.Equal(t, 2, dataSource.called) + assert.Equal(t, val, v, "should refetch unexpired data as forced refresh") +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index b7950a40d84..2f3bf320896 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -130,19 +130,21 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { - getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { + getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { + var found bool cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { - return "", "", nil, err + return "", "", nil, found, err } virtualMachines := cached.(*sync.Map) if vm, ok := virtualMachines.Load(nodeName); ok { result := vm.(*vmssVirtualMachinesEntry) - return result.vmssName, result.instanceID, result.virtualMachine, nil + found = true + return result.vmssName, result.instanceID, result.virtualMachine, found, nil } - return "", "", nil, nil + return "", "", nil, found, nil } _, err := getScaleSetVMInstanceID(nodeName) @@ -150,22 +152,24 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin return "", "", nil, err } - vmssName, instanceID, vm, err := getter(nodeName) + vmssName, instanceID, vm, found, err := getter(nodeName, crt) if err != nil { return "", "", nil, err } - if vm != nil { + + if !found { + klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) + vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh) + if err != nil { + return "", "", nil, err + } + } + + if found && vm != nil { return vmssName, instanceID, vm, nil } - klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) - ss.vmssVMCache.Delete(vmssVirtualMachinesKey) - vmssName, instanceID, vm, err = getter(nodeName) - if err != nil { - return "", "", nil, err - } - - if vm == nil { + if !found || vm == nil { return "", "", nil, cloudprovider.InstanceNotFound } return vmssName, instanceID, vm, nil @@ -196,7 +200,7 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { - getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { + getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) if err != nil { return nil, false, err @@ -219,21 +223,21 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI return vm, found, nil } - vm, found, err := getter() - if err != nil { - return nil, err - } - if found { - return vm, nil - } - - klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) - ss.vmssVMCache.Delete(vmssVirtualMachinesKey) - vm, found, err = getter() + vm, found, err := getter(crt) if err != nil { return nil, err } if !found { + klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) + vm, found, err = getter(cacheReadTypeForceRefresh) + if err != nil { + return nil, err + } + } + if found && vm != nil { + return vm, nil + } + if !found || vm == nil { return nil, cloudprovider.InstanceNotFound } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index 4df9dcaa5db..90082f0e47c 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -111,6 +111,26 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry + oldCache := make(map[string]vmssVirtualMachinesEntry) + + if ss.vmssVMCache != nil { + // get old cache before refreshing the cache + entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey) + if err != nil { + return nil, err + } + if exists { + cached := entry.(*cacheEntry).data + if cached != nil { + virtualMachines := cached.(*sync.Map) + virtualMachines.Range(func(key, value interface{}) bool { + oldCache[key.(string)] = *value.(*vmssVirtualMachinesEntry) + return true + }) + } + } + } + allResourceGroups, err := ss.GetResourceGroups() if err != nil { return nil, err @@ -143,8 +163,38 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { virtualMachine: &vm, lastUpdate: time.Now().UTC(), }) + + if _, exists := oldCache[computerName]; exists { + delete(oldCache, computerName) + } } } + + // add old missing cache data with nil entries to prevent aggressive + // ARM calls during cache invalidation + for name, vmEntry := range oldCache { + // if the nil cache entry has existed for 15 minutes in the cache + // then it should not be added back to the cache + if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > 15*time.Minute { + klog.V(5).Infof("ignoring expired entries from old cache for %s", name) + continue + } + lastUpdate := time.Now().UTC() + if vmEntry.virtualMachine == nil { + // if this is already a nil entry then keep the time the nil + // entry was first created, so we can cleanup unwanted entries + lastUpdate = vmEntry.lastUpdate + } + + klog.V(5).Infof("adding old entries to new cache for %s", name) + localCache.Store(name, &vmssVirtualMachinesEntry{ + resourceGroup: vmEntry.resourceGroup, + vmssName: vmEntry.vmssName, + instanceID: vmEntry.instanceID, + virtualMachine: nil, + lastUpdate: lastUpdate, + }) + } } return localCache, nil