mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #87531 from aramase/set-nil-cache
azure: set nil cache entry based on old cache
This commit is contained in:
commit
e2f529adaf
@ -38,6 +38,9 @@ const (
|
||||
// active/expired. If entry doesn't exist in cache, then data is fetched
|
||||
// using getter, saved in cache and returned
|
||||
cacheReadTypeUnsafe
|
||||
// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry
|
||||
// is not expired
|
||||
cacheReadTypeForceRefresh
|
||||
)
|
||||
|
||||
// getFunc defines a getter function for timedCache.
|
||||
@ -120,20 +123,20 @@ func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) {
|
||||
entry.lock.Lock()
|
||||
defer entry.lock.Unlock()
|
||||
|
||||
// entry exists
|
||||
if entry.data != nil {
|
||||
// entry exists and if cache is not force refreshed
|
||||
if entry.data != nil && crt != cacheReadTypeForceRefresh {
|
||||
// allow unsafe read, so return data even if expired
|
||||
if crt == cacheReadTypeUnsafe {
|
||||
return entry.data, nil
|
||||
}
|
||||
// if cached data is not expired, return cached data
|
||||
if time.Since(entry.createdOn) < t.ttl {
|
||||
if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl {
|
||||
return entry.data, nil
|
||||
}
|
||||
}
|
||||
// Data is not cached yet or cache data is expired, cache it by getter.
|
||||
// entry is locked before getting to ensure concurrent gets don't result in
|
||||
// multiple ARM calls.
|
||||
// Data is not cached yet, cache data is expired or requested force refresh
|
||||
// cache it by getter. entry is locked before getting to ensure concurrent
|
||||
// gets don't result in multiple ARM calls.
|
||||
data, err := t.getter(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -204,3 +204,23 @@ func TestCacheNoConcurrentGet(t *testing.T) {
|
||||
assert.Equal(t, 1, dataSource.called)
|
||||
assert.Equal(t, val, v, "cache should get correct data")
|
||||
}
|
||||
|
||||
func TestCacheForceRefresh(t *testing.T) {
|
||||
key := "key1"
|
||||
val := &fakeDataObj{}
|
||||
data := map[string]*fakeDataObj{
|
||||
key: val,
|
||||
}
|
||||
dataSource, cache := newFakeCache(t)
|
||||
dataSource.set(data)
|
||||
|
||||
v, err := cache.Get(key, cacheReadTypeDefault)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, dataSource.called)
|
||||
assert.Equal(t, val, v, "cache should get correct data")
|
||||
|
||||
v, err = cache.Get(key, cacheReadTypeForceRefresh)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, dataSource.called)
|
||||
assert.Equal(t, val, v, "should refetch unexpired data as forced refresh")
|
||||
}
|
||||
|
@ -133,19 +133,21 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua
|
||||
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
|
||||
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
|
||||
func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
|
||||
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
|
||||
getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
|
||||
var found bool
|
||||
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
return "", "", nil, found, err
|
||||
}
|
||||
|
||||
virtualMachines := cached.(*sync.Map)
|
||||
if vm, ok := virtualMachines.Load(nodeName); ok {
|
||||
result := vm.(*vmssVirtualMachinesEntry)
|
||||
return result.vmssName, result.instanceID, result.virtualMachine, nil
|
||||
found = true
|
||||
return result.vmssName, result.instanceID, result.virtualMachine, found, nil
|
||||
}
|
||||
|
||||
return "", "", nil, nil
|
||||
return "", "", nil, found, nil
|
||||
}
|
||||
|
||||
_, err := getScaleSetVMInstanceID(nodeName)
|
||||
@ -153,22 +155,24 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
vmssName, instanceID, vm, err := getter(nodeName)
|
||||
vmssName, instanceID, vm, found, err := getter(nodeName, crt)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
if vm != nil {
|
||||
|
||||
if !found {
|
||||
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
|
||||
vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if found && vm != nil {
|
||||
return vmssName, instanceID, vm, nil
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
|
||||
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
|
||||
vmssName, instanceID, vm, err = getter(nodeName)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
if vm == nil {
|
||||
if !found || vm == nil {
|
||||
return "", "", nil, cloudprovider.InstanceNotFound
|
||||
}
|
||||
return vmssName, instanceID, vm, nil
|
||||
@ -199,7 +203,7 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
|
||||
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
||||
// The node must belong to one of scale sets.
|
||||
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
|
||||
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
|
||||
getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
|
||||
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
@ -222,21 +226,21 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI
|
||||
return vm, found, nil
|
||||
}
|
||||
|
||||
vm, found, err := getter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if found {
|
||||
return vm, nil
|
||||
}
|
||||
|
||||
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
|
||||
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
|
||||
vm, found, err = getter()
|
||||
vm, found, err := getter(crt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !found {
|
||||
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
|
||||
vm, found, err = getter(cacheReadTypeForceRefresh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if found && vm != nil {
|
||||
return vm, nil
|
||||
}
|
||||
if !found || vm == nil {
|
||||
return nil, cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
|
@ -111,6 +111,26 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
|
||||
getter := func(key string) (interface{}, error) {
|
||||
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
|
||||
|
||||
oldCache := make(map[string]vmssVirtualMachinesEntry)
|
||||
|
||||
if ss.vmssVMCache != nil {
|
||||
// get old cache before refreshing the cache
|
||||
entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if exists {
|
||||
cached := entry.(*cacheEntry).data
|
||||
if cached != nil {
|
||||
virtualMachines := cached.(*sync.Map)
|
||||
virtualMachines.Range(func(key, value interface{}) bool {
|
||||
oldCache[key.(string)] = *value.(*vmssVirtualMachinesEntry)
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allResourceGroups, err := ss.GetResourceGroups()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -143,8 +163,38 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
|
||||
virtualMachine: &vm,
|
||||
lastUpdate: time.Now().UTC(),
|
||||
})
|
||||
|
||||
if _, exists := oldCache[computerName]; exists {
|
||||
delete(oldCache, computerName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add old missing cache data with nil entries to prevent aggressive
|
||||
// ARM calls during cache invalidation
|
||||
for name, vmEntry := range oldCache {
|
||||
// if the nil cache entry has existed for 15 minutes in the cache
|
||||
// then it should not be added back to the cache
|
||||
if vmEntry.virtualMachine == nil || time.Since(vmEntry.lastUpdate) > 15*time.Minute {
|
||||
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
|
||||
continue
|
||||
}
|
||||
lastUpdate := time.Now().UTC()
|
||||
if vmEntry.virtualMachine == nil {
|
||||
// if this is already a nil entry then keep the time the nil
|
||||
// entry was first created, so we can cleanup unwanted entries
|
||||
lastUpdate = vmEntry.lastUpdate
|
||||
}
|
||||
|
||||
klog.V(5).Infof("adding old entries to new cache for %s", name)
|
||||
localCache.Store(name, &vmssVirtualMachinesEntry{
|
||||
resourceGroup: vmEntry.resourceGroup,
|
||||
vmssName: vmEntry.vmssName,
|
||||
instanceID: vmEntry.instanceID,
|
||||
virtualMachine: nil,
|
||||
lastUpdate: lastUpdate,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return localCache, nil
|
||||
|
Loading…
Reference in New Issue
Block a user