mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Azure: per VMSS, incremental VMSS VMs cache
Azure's cloud provider VMSS VMs API accesses are mediated through a cache holding and refreshing all VMSS together. Due to that we hit VMSSVM.List API more often than we could: an instance's cache miss or expiration should only require a single VMSS re-list, while it's currently O(n) relative to the number of attached Scale Sets. Under hard pressure (clusters with many attached VMSS that can't all be listed in one sequence of successive API calls) the controller manager might be stuck trying to re-list everything from scratch, then aborting the whole operation; then re-trying and re-triggering API rate-limits, affecting the whole Subscription. This patch replaces the global VMSS VMs cache by per-VMSS VMs caches. Refreshes (VMSS VMs lists) are scoped to the single relevant VMSS; under severe throttling the various caches can be incrementally refreshed. Signed-off-by: Benjamin Pineau <benjamin.pineau@datadoghq.com>
This commit is contained in:
parent
43fbe17dc6
commit
85ecd0e17c
@ -61,6 +61,13 @@ type vmssMetaInfo struct {
|
||||
resourceGroup string
|
||||
}
|
||||
|
||||
// nodeIdentity identifies a node within a subscription.
|
||||
type nodeIdentity struct {
|
||||
resourceGroup string
|
||||
vmssName string
|
||||
nodeName string
|
||||
}
|
||||
|
||||
// scaleSet implements VMSet interface for Azure scale set.
|
||||
type scaleSet struct {
|
||||
*Cloud
|
||||
@ -70,7 +77,7 @@ type scaleSet struct {
|
||||
availabilitySet VMSet
|
||||
|
||||
vmssCache *azcache.TimedCache
|
||||
vmssVMCache *azcache.TimedCache
|
||||
vmssVMCache *sync.Map // [resourcegroup/vmssname]*azcache.TimedCache
|
||||
availabilitySetNodesCache *azcache.TimedCache
|
||||
}
|
||||
|
||||
@ -80,6 +87,7 @@ func newScaleSet(az *Cloud) (VMSet, error) {
|
||||
ss := &scaleSet{
|
||||
Cloud: az,
|
||||
availabilitySet: newAvailabilitySet(az),
|
||||
vmssVMCache: &sync.Map{},
|
||||
}
|
||||
|
||||
if !ss.DisableAvailabilitySetNodes {
|
||||
@ -94,11 +102,6 @@ func newScaleSet(az *Cloud) (VMSet, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
@ -139,12 +142,17 @@ func (ss *scaleSet) getVMSS(vmssName string, crt azcache.AzureCacheReadType) (*c
|
||||
return vmss, nil
|
||||
}
|
||||
|
||||
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
|
||||
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
|
||||
func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
|
||||
// getVmssVMByNodeIdentity find virtualMachineScaleSetVM by nodeIdentity, using node's parent VMSS cache.
|
||||
// Returns cloudprovider.InstanceNotFound if the node does not belong to the scale set named in nodeIdentity.
|
||||
func (ss *scaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
|
||||
cacheKey, cache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
getter := func(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) {
|
||||
var found bool
|
||||
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
|
||||
cached, err := cache.Get(cacheKey, crt)
|
||||
if err != nil {
|
||||
return "", "", nil, found, err
|
||||
}
|
||||
@ -159,19 +167,19 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (
|
||||
return "", "", nil, found, nil
|
||||
}
|
||||
|
||||
_, err := getScaleSetVMInstanceID(nodeName)
|
||||
_, err = getScaleSetVMInstanceID(node.nodeName)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
vmssName, instanceID, vm, found, err := getter(nodeName, crt)
|
||||
vmssName, instanceID, vm, found, err := getter(node.nodeName, crt)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
if !found {
|
||||
klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
|
||||
vmssName, instanceID, vm, found, err = getter(nodeName, azcache.CacheReadTypeForceRefresh)
|
||||
klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", node.nodeName)
|
||||
vmssName, instanceID, vm, found, err = getter(node.nodeName, azcache.CacheReadTypeForceRefresh)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
@ -187,6 +195,17 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (
|
||||
return vmssName, instanceID, vm, nil
|
||||
}
|
||||
|
||||
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
|
||||
// Returns cloudprovider.InstanceNotFound if nodeName does not belong to any scale set.
|
||||
func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) {
|
||||
node, err := ss.getNodeIdentityByNodeName(nodeName, crt)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
return ss.getVmssVMByNodeIdentity(node, crt)
|
||||
}
|
||||
|
||||
// GetPowerStatusByNodeName returns the power state of the specified node.
|
||||
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
|
||||
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe)
|
||||
@ -222,8 +241,13 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
|
||||
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
||||
// The node must belong to one of scale sets.
|
||||
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSetVM, error) {
|
||||
cacheKey, cache, err := ss.getVMSSVMCache(resourceGroup, scaleSetName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getter := func(crt azcache.AzureCacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
|
||||
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt)
|
||||
cached, err := cache.Get(cacheKey, crt)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@ -590,6 +614,66 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
|
||||
return ssNames, nil
|
||||
}
|
||||
|
||||
// getNodeIdentityByNodeName use the VMSS cache to find a node's resourcegroup and vmss, returned in a nodeIdentity.
|
||||
func (ss *scaleSet) getNodeIdentityByNodeName(nodeName string, crt azcache.AzureCacheReadType) (*nodeIdentity, error) {
|
||||
getter := func(nodeName string, crt azcache.AzureCacheReadType) (*nodeIdentity, error) {
|
||||
node := &nodeIdentity{
|
||||
nodeName: nodeName,
|
||||
}
|
||||
|
||||
cached, err := ss.vmssCache.Get(vmssKey, crt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vmsses := cached.(*sync.Map)
|
||||
vmsses.Range(func(key, value interface{}) bool {
|
||||
v := value.(*vmssEntry)
|
||||
if v.vmss.Name == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
vmssPrefix := *v.vmss.Name
|
||||
if v.vmss.VirtualMachineProfile != nil &&
|
||||
v.vmss.VirtualMachineProfile.OsProfile != nil &&
|
||||
v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix != nil {
|
||||
vmssPrefix = *v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix
|
||||
}
|
||||
|
||||
if strings.EqualFold(vmssPrefix, nodeName[:len(nodeName)-6]) {
|
||||
node.vmssName = *v.vmss.Name
|
||||
node.resourceGroup = v.resourceGroup
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
return node, nil
|
||||
}
|
||||
|
||||
if _, err := getScaleSetVMInstanceID(nodeName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
node, err := getter(nodeName, crt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if node.vmssName != "" {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Couldn't find VMSS for node %s, refreshing the cache", nodeName)
|
||||
node, err = getter(nodeName, azcache.CacheReadTypeForceRefresh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if node.vmssName == "" {
|
||||
return nil, cloudprovider.InstanceNotFound
|
||||
}
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// listScaleSetVMs lists VMs belonging to the specified scale set.
|
||||
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
|
||||
ctx, cancel := getContextWithCancel()
|
||||
|
@ -20,6 +20,7 @@ package azure
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -36,7 +37,6 @@ var (
|
||||
vmssNameSeparator = "_"
|
||||
|
||||
vmssKey = "k8svmssKey"
|
||||
vmssVirtualMachinesKey = "k8svmssVirtualMachinesKey"
|
||||
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
|
||||
|
||||
availabilitySetNodesCacheTTLDefaultInSeconds = 900
|
||||
@ -53,8 +53,9 @@ type vmssVirtualMachinesEntry struct {
|
||||
}
|
||||
|
||||
type vmssEntry struct {
|
||||
vmss *compute.VirtualMachineScaleSet
|
||||
lastUpdate time.Time
|
||||
vmss *compute.VirtualMachineScaleSet
|
||||
resourceGroup string
|
||||
lastUpdate time.Time
|
||||
}
|
||||
|
||||
func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
|
||||
@ -80,8 +81,9 @@ func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
|
||||
continue
|
||||
}
|
||||
localCache.Store(*scaleSet.Name, &vmssEntry{
|
||||
vmss: &scaleSet,
|
||||
lastUpdate: time.Now().UTC(),
|
||||
vmss: &scaleSet,
|
||||
resourceGroup: resourceGroup,
|
||||
lastUpdate: time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -109,15 +111,58 @@ func extractVmssVMName(name string) (string, string, error) {
|
||||
return ssName, instanceID, nil
|
||||
}
|
||||
|
||||
func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
|
||||
// getVMSSVMCache returns an *azcache.TimedCache and cache key for a VMSS (creating that cache if new).
|
||||
func (ss *scaleSet) getVMSSVMCache(resourceGroup, vmssName string) (string, *azcache.TimedCache, error) {
|
||||
cacheKey := strings.ToLower(fmt.Sprintf("%s/%s", resourceGroup, vmssName))
|
||||
if entry, ok := ss.vmssVMCache.Load(cacheKey); ok {
|
||||
cache := entry.(*azcache.TimedCache)
|
||||
return cacheKey, cache, nil
|
||||
}
|
||||
|
||||
cache, err := ss.newVMSSVirtualMachinesCache(resourceGroup, vmssName, cacheKey)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
ss.vmssVMCache.Store(cacheKey, cache)
|
||||
return cacheKey, cache, nil
|
||||
}
|
||||
|
||||
// gcVMSSVMCache delete stale VMSS VMs caches from deleted VMSSes.
|
||||
func (ss *scaleSet) gcVMSSVMCache() error {
|
||||
cached, err := ss.vmssCache.Get(vmssKey, azcache.CacheReadTypeUnsafe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vmsses := cached.(*sync.Map)
|
||||
removed := map[string]bool{}
|
||||
ss.vmssVMCache.Range(func(key, value interface{}) bool {
|
||||
cacheKey := key.(string)
|
||||
vlistIdx := cacheKey[strings.LastIndex(cacheKey, "/")+1:]
|
||||
if _, ok := vmsses.Load(vlistIdx); !ok {
|
||||
removed[cacheKey] = true
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
for key := range removed {
|
||||
ss.vmssVMCache.Delete(key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newVMSSVirtualMachinesCache instanciates a new VMs cache for VMs belonging to the provided VMSS.
|
||||
func (ss *scaleSet) newVMSSVirtualMachinesCache(resourceGroupName, vmssName, cacheKey string) (*azcache.TimedCache, error) {
|
||||
getter := func(key string) (interface{}, error) {
|
||||
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
|
||||
|
||||
oldCache := make(map[string]vmssVirtualMachinesEntry)
|
||||
|
||||
if ss.vmssVMCache != nil {
|
||||
if vmssCache, ok := ss.vmssVMCache.Load(cacheKey); ok {
|
||||
// get old cache before refreshing the cache
|
||||
entry, exists, err := ss.vmssVMCache.Store.GetByKey(vmssVirtualMachinesKey)
|
||||
cache := vmssCache.(*azcache.TimedCache)
|
||||
entry, exists, err := cache.Store.GetByKey(cacheKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -133,75 +178,61 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
|
||||
}
|
||||
}
|
||||
|
||||
allResourceGroups, err := ss.GetResourceGroups()
|
||||
vms, err := ss.listScaleSetVMs(vmssName, resourceGroupName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, resourceGroup := range allResourceGroups.List() {
|
||||
scaleSetNames, err := ss.listScaleSets(resourceGroup)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := range vms {
|
||||
vm := vms[i]
|
||||
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
|
||||
klog.Warningf("failed to get computerName for vmssVM (%q)", vmssName)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, ssName := range scaleSetNames {
|
||||
vms, err := ss.listScaleSetVMs(ssName, resourceGroup)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
|
||||
vmssVMCacheEntry := &vmssVirtualMachinesEntry{
|
||||
resourceGroup: resourceGroupName,
|
||||
vmssName: vmssName,
|
||||
instanceID: to.String(vm.InstanceID),
|
||||
virtualMachine: &vm,
|
||||
lastUpdate: time.Now().UTC(),
|
||||
}
|
||||
// set cache entry to nil when the VM is under deleting.
|
||||
if vm.VirtualMachineScaleSetVMProperties != nil &&
|
||||
strings.EqualFold(to.String(vm.VirtualMachineScaleSetVMProperties.ProvisioningState), string(compute.ProvisioningStateDeleting)) {
|
||||
klog.V(4).Infof("VMSS virtualMachine %q is under deleting, setting its cache to nil", computerName)
|
||||
vmssVMCacheEntry.virtualMachine = nil
|
||||
}
|
||||
localCache.Store(computerName, vmssVMCacheEntry)
|
||||
|
||||
for i := range vms {
|
||||
vm := vms[i]
|
||||
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
|
||||
klog.Warningf("failed to get computerName for vmssVM (%q)", ssName)
|
||||
continue
|
||||
}
|
||||
delete(oldCache, computerName)
|
||||
}
|
||||
|
||||
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
|
||||
vmssVMCacheEntry := &vmssVirtualMachinesEntry{
|
||||
resourceGroup: resourceGroup,
|
||||
vmssName: ssName,
|
||||
instanceID: to.String(vm.InstanceID),
|
||||
virtualMachine: &vm,
|
||||
lastUpdate: time.Now().UTC(),
|
||||
}
|
||||
// set cache entry to nil when the VM is under deleting.
|
||||
if vm.VirtualMachineScaleSetVMProperties != nil &&
|
||||
strings.EqualFold(to.String(vm.VirtualMachineScaleSetVMProperties.ProvisioningState), string(compute.ProvisioningStateDeleting)) {
|
||||
klog.V(4).Infof("VMSS virtualMachine %q is under deleting, setting its cache to nil", computerName)
|
||||
vmssVMCacheEntry.virtualMachine = nil
|
||||
}
|
||||
localCache.Store(computerName, vmssVMCacheEntry)
|
||||
|
||||
delete(oldCache, computerName)
|
||||
}
|
||||
// add old missing cache data with nil entries to prevent aggressive
|
||||
// ARM calls during cache invalidation
|
||||
for name, vmEntry := range oldCache {
|
||||
// if the nil cache entry has existed for 15 minutes in the cache
|
||||
// then it should not be added back to the cache
|
||||
if vmEntry.virtualMachine == nil && time.Since(vmEntry.lastUpdate) > 15*time.Minute {
|
||||
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
|
||||
continue
|
||||
}
|
||||
lastUpdate := time.Now().UTC()
|
||||
if vmEntry.virtualMachine == nil {
|
||||
// if this is already a nil entry then keep the time the nil
|
||||
// entry was first created, so we can cleanup unwanted entries
|
||||
lastUpdate = vmEntry.lastUpdate
|
||||
}
|
||||
|
||||
// add old missing cache data with nil entries to prevent aggressive
|
||||
// ARM calls during cache invalidation
|
||||
for name, vmEntry := range oldCache {
|
||||
// if the nil cache entry has existed for 15 minutes in the cache
|
||||
// then it should not be added back to the cache
|
||||
if vmEntry.virtualMachine == nil && time.Since(vmEntry.lastUpdate) > 15*time.Minute {
|
||||
klog.V(5).Infof("ignoring expired entries from old cache for %s", name)
|
||||
continue
|
||||
}
|
||||
lastUpdate := time.Now().UTC()
|
||||
if vmEntry.virtualMachine == nil {
|
||||
// if this is already a nil entry then keep the time the nil
|
||||
// entry was first created, so we can cleanup unwanted entries
|
||||
lastUpdate = vmEntry.lastUpdate
|
||||
}
|
||||
|
||||
klog.V(5).Infof("adding old entries to new cache for %s", name)
|
||||
localCache.Store(name, &vmssVirtualMachinesEntry{
|
||||
resourceGroup: vmEntry.resourceGroup,
|
||||
vmssName: vmEntry.vmssName,
|
||||
instanceID: vmEntry.instanceID,
|
||||
virtualMachine: nil,
|
||||
lastUpdate: lastUpdate,
|
||||
})
|
||||
}
|
||||
klog.V(5).Infof("adding old entries to new cache for %s", name)
|
||||
localCache.Store(name, &vmssVirtualMachinesEntry{
|
||||
resourceGroup: vmEntry.resourceGroup,
|
||||
vmssName: vmEntry.vmssName,
|
||||
instanceID: vmEntry.instanceID,
|
||||
virtualMachine: nil,
|
||||
lastUpdate: lastUpdate,
|
||||
})
|
||||
}
|
||||
|
||||
return localCache, nil
|
||||
@ -214,14 +245,30 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) {
|
||||
}
|
||||
|
||||
func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
|
||||
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeUnsafe)
|
||||
node, err := ss.getNodeIdentityByNodeName(nodeName, azcache.CacheReadTypeUnsafe)
|
||||
if err != nil {
|
||||
klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
virtualMachines := cached.(*sync.Map)
|
||||
cacheKey, timedcache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName)
|
||||
if err != nil {
|
||||
klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
vmcache, err := timedcache.Get(cacheKey, azcache.CacheReadTypeUnsafe)
|
||||
if err != nil {
|
||||
klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err)
|
||||
return err
|
||||
}
|
||||
virtualMachines := vmcache.(*sync.Map)
|
||||
virtualMachines.Delete(nodeName)
|
||||
|
||||
if err := ss.gcVMSSVMCache(); err != nil {
|
||||
klog.Errorf("deleteCacheForNode(%s) failed to gc stale vmss caches: %v", nodeName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user