diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 7670608ca14..87bec02dafb 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -61,6 +61,13 @@ type vmssMetaInfo struct { resourceGroup string } +// nodeIdentity identifies a node within a subscription. +type nodeIdentity struct { + resourceGroup string + vmssName string + nodeName string +} + // scaleSet implements VMSet interface for Azure scale set. type scaleSet struct { *Cloud @@ -70,7 +77,7 @@ type scaleSet struct { availabilitySet VMSet vmssCache *azcache.TimedCache - vmssVMCache *azcache.TimedCache + vmssVMCache *sync.Map // [resourcegroup/vmssname]*azcache.TimedCache availabilitySetNodesCache *azcache.TimedCache } @@ -80,6 +87,7 @@ func newScaleSet(az *Cloud) (VMSet, error) { ss := &scaleSet{ Cloud: az, availabilitySet: newAvailabilitySet(az), + vmssVMCache: &sync.Map{}, } if !ss.DisableAvailabilitySetNodes { @@ -94,11 +102,6 @@ func newScaleSet(az *Cloud) (VMSet, error) { return nil, err } - ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache() - if err != nil { - return nil, err - } - return ss, nil } @@ -139,12 +142,17 @@ func (ss *scaleSet) getVMSS(vmssName string, crt azcache.AzureCacheReadType) (*c return vmss, nil } -// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. -// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. -func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { +// getVmssVMByNodeIdentity find virtualMachineScaleSetVM by nodeIdentity, using node's parent VMSS cache. +// Returns cloudprovider.InstanceNotFound if the node does not belong to the scale set named in nodeIdentity. +func (ss *scaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { + cacheKey, cache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) + if err != nil { + return "", "", nil, err + } + getter := func(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { var found bool - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) + cached, err := cache.Get(cacheKey, crt) if err != nil { return "", "", nil, found, err } @@ -159,19 +167,19 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) ( return "", "", nil, found, nil } - _, err := getScaleSetVMInstanceID(nodeName) + _, err = getScaleSetVMInstanceID(node.nodeName) if err != nil { return "", "", nil, err } - vmssName, instanceID, vm, found, err := getter(nodeName, crt) + vmssName, instanceID, vm, found, err := getter(node.nodeName, crt) if err != nil { return "", "", nil, err } if !found { - klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) - vmssName, instanceID, vm, found, err = getter(nodeName, azcache.CacheReadTypeForceRefresh) + klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", node.nodeName) + vmssName, instanceID, vm, found, err = getter(node.nodeName, azcache.CacheReadTypeForceRefresh) if err != nil { return "", "", nil, err } @@ -187,6 +195,17 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) ( return vmssName, instanceID, vm, nil } +// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. +// Returns cloudprovider.InstanceNotFound if nodeName does not belong to any scale set. +func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { + node, err := ss.getNodeIdentityByNodeName(nodeName, crt) + if err != nil { + return "", "", nil, err + } + + return ss.getVmssVMByNodeIdentity(node, crt) +} + // GetPowerStatusByNodeName returns the power state of the specified node. func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe) @@ -222,8 +241,13 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSetVM, error) { + cacheKey, cache, err := ss.getVMSSVMCache(resourceGroup, scaleSetName) + if err != nil { + return nil, err + } + getter := func(crt azcache.AzureCacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) + cached, err := cache.Get(cacheKey, crt) if err != nil { return nil, false, err } @@ -590,6 +614,66 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) { return ssNames, nil } +// getNodeIdentityByNodeName use the VMSS cache to find a node's resourcegroup and vmss, returned in a nodeIdentity. +func (ss *scaleSet) getNodeIdentityByNodeName(nodeName string, crt azcache.AzureCacheReadType) (*nodeIdentity, error) { + getter := func(nodeName string, crt azcache.AzureCacheReadType) (*nodeIdentity, error) { + node := &nodeIdentity{ + nodeName: nodeName, + } + + cached, err := ss.vmssCache.Get(vmssKey, crt) + if err != nil { + return nil, err + } + + vmsses := cached.(*sync.Map) + vmsses.Range(func(key, value interface{}) bool { + v := value.(*vmssEntry) + if v.vmss.Name == nil { + return true + } + + vmssPrefix := *v.vmss.Name + if v.vmss.VirtualMachineProfile != nil && + v.vmss.VirtualMachineProfile.OsProfile != nil && + v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix != nil { + vmssPrefix = *v.vmss.VirtualMachineProfile.OsProfile.ComputerNamePrefix + } + + if strings.EqualFold(vmssPrefix, nodeName[:len(nodeName)-6]) { + node.vmssName = *v.vmss.Name + node.resourceGroup = v.resourceGroup + return false + } + + return true + }) + return node, nil + } + + if _, err := getScaleSetVMInstanceID(nodeName); err != nil { + return nil, err + } + + node, err := getter(nodeName, crt) + if err != nil { + return nil, err + } + if node.vmssName != "" { + return node, nil + } + + klog.V(2).Infof("Couldn't find VMSS for node %s, refreshing the cache", nodeName) + node, err = getter(nodeName, azcache.CacheReadTypeForceRefresh) + if err != nil { + return nil, err + } + if node.vmssName == "" { + return nil, cloudprovider.InstanceNotFound + } + return node, nil +} + // listScaleSetVMs lists VMs belonging to the specified scale set. func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) { ctx, cancel := getContextWithCancel() diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index 94a85aade66..44dc282d417 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -20,6 +20,7 @@ package azure import ( "context" + "fmt" "strings" "sync" "time" @@ -36,7 +37,6 @@ var ( vmssNameSeparator = "_" vmssKey = "k8svmssKey" - vmssVirtualMachinesKey = "k8svmssVirtualMachinesKey" availabilitySetNodesKey = "k8sAvailabilitySetNodesKey" availabilitySetNodesCacheTTLDefaultInSeconds = 900 @@ -53,8 +53,9 @@ type vmssVirtualMachinesEntry struct { } type vmssEntry struct { - vmss *compute.VirtualMachineScaleSet - lastUpdate time.Time + vmss *compute.VirtualMachineScaleSet + resourceGroup string + lastUpdate time.Time } func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) { @@ -80,8 +81,9 @@ func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) { continue } localCache.Store(*scaleSet.Name, &vmssEntry{ - vmss: &scaleSet, - lastUpdate: time.Now().UTC(), + vmss: &scaleSet, + resourceGroup: resourceGroup, + lastUpdate: time.Now().UTC(), }) } } @@ -109,15 +111,58 @@ func extractVmssVMName(name string) (string, string, error) { return ssName, instanceID, nil } -func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { +// getVMSSVMCache returns an *azcache.TimedCache and cache key for a VMSS (creating that cache if new). +func (ss *scaleSet) getVMSSVMCache(resourceGroup, vmssName string) (string, *azcache.TimedCache, error) { + cacheKey := strings.ToLower(fmt.Sprintf("%s/%s", resourceGroup, vmssName)) + if entry, ok := ss.vmssVMCache.Load(cacheKey); ok { + cache := entry.(*azcache.TimedCache) + return cacheKey, cache, nil + } + + cache, err := ss.newVMSSVirtualMachinesCache(resourceGroup, vmssName, cacheKey) + if err != nil { + return "", nil, err + } + ss.vmssVMCache.Store(cacheKey, cache) + return cacheKey, cache, nil +} + +// gcVMSSVMCache delete stale VMSS VMs caches from deleted VMSSes. +func (ss *scaleSet) gcVMSSVMCache() error { + cached, err := ss.vmssCache.Get(vmssKey, azcache.CacheReadTypeUnsafe) + if err != nil { + return err + } + + vmsses := cached.(*sync.Map) + removed := map[string]bool{} + ss.vmssVMCache.Range(func(key, value interface{}) bool { + cacheKey := key.(string) + vlistIdx := cacheKey[strings.LastIndex(cacheKey, "/")+1:] + if _, ok := vmsses.Load(vlistIdx); !ok { + removed[cacheKey] = true + } + return true + }) + + for key := range removed { + ss.vmssVMCache.Delete(key) + } + + return nil +} + +// newVMSSVirtualMachinesCache instanciates a new VMs cache for VMs belonging to the provided VMSS. +func (ss *scaleSet) newVMSSVirtualMachinesCache(resourceGroupName, vmssName, cacheKey string) (*azcache.TimedCache, error) { getter := func(key string) (interface{}, error) { localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry oldCache := make(map[string]vmssVirtualMachinesEntry) - if ss.vmssVMCache != nil { + if vmssCache, ok := ss.vmssVMCache.Load(cacheKey); ok { // get old cache before refreshing the cache - entry, exists, err := ss.vmssVMCache.Store.GetByKey(vmssVirtualMachinesKey) + cache := vmssCache.(*azcache.TimedCache) + entry, exists, err := cache.Store.GetByKey(cacheKey) if err != nil { return nil, err } @@ -133,75 +178,61 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { } } - allResourceGroups, err := ss.GetResourceGroups() + vms, err := ss.listScaleSetVMs(vmssName, resourceGroupName) if err != nil { return nil, err } - for _, resourceGroup := range allResourceGroups.List() { - scaleSetNames, err := ss.listScaleSets(resourceGroup) - if err != nil { - return nil, err + for i := range vms { + vm := vms[i] + if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { + klog.Warningf("failed to get computerName for vmssVM (%q)", vmssName) + continue } - for _, ssName := range scaleSetNames { - vms, err := ss.listScaleSetVMs(ssName, resourceGroup) - if err != nil { - return nil, err - } + computerName := strings.ToLower(*vm.OsProfile.ComputerName) + vmssVMCacheEntry := &vmssVirtualMachinesEntry{ + resourceGroup: resourceGroupName, + vmssName: vmssName, + instanceID: to.String(vm.InstanceID), + virtualMachine: &vm, + lastUpdate: time.Now().UTC(), + } + // set cache entry to nil when the VM is under deleting. + if vm.VirtualMachineScaleSetVMProperties != nil && + strings.EqualFold(to.String(vm.VirtualMachineScaleSetVMProperties.ProvisioningState), string(compute.ProvisioningStateDeleting)) { + klog.V(4).Infof("VMSS virtualMachine %q is under deleting, setting its cache to nil", computerName) + vmssVMCacheEntry.virtualMachine = nil + } + localCache.Store(computerName, vmssVMCacheEntry) - for i := range vms { - vm := vms[i] - if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { - klog.Warningf("failed to get computerName for vmssVM (%q)", ssName) - continue - } + delete(oldCache, computerName) + } - computerName := strings.ToLower(*vm.OsProfile.ComputerName) - vmssVMCacheEntry := &vmssVirtualMachinesEntry{ - resourceGroup: resourceGroup, - vmssName: ssName, - instanceID: to.String(vm.InstanceID), - virtualMachine: &vm, - lastUpdate: time.Now().UTC(), - } - // set cache entry to nil when the VM is under deleting. - if vm.VirtualMachineScaleSetVMProperties != nil && - strings.EqualFold(to.String(vm.VirtualMachineScaleSetVMProperties.ProvisioningState), string(compute.ProvisioningStateDeleting)) { - klog.V(4).Infof("VMSS virtualMachine %q is under deleting, setting its cache to nil", computerName) - vmssVMCacheEntry.virtualMachine = nil - } - localCache.Store(computerName, vmssVMCacheEntry) - - delete(oldCache, computerName) - } + // add old missing cache data with nil entries to prevent aggressive + // ARM calls during cache invalidation + for name, vmEntry := range oldCache { + // if the nil cache entry has existed for 15 minutes in the cache + // then it should not be added back to the cache + if vmEntry.virtualMachine == nil && time.Since(vmEntry.lastUpdate) > 15*time.Minute { + klog.V(5).Infof("ignoring expired entries from old cache for %s", name) + continue + } + lastUpdate := time.Now().UTC() + if vmEntry.virtualMachine == nil { + // if this is already a nil entry then keep the time the nil + // entry was first created, so we can cleanup unwanted entries + lastUpdate = vmEntry.lastUpdate } - // add old missing cache data with nil entries to prevent aggressive - // ARM calls during cache invalidation - for name, vmEntry := range oldCache { - // if the nil cache entry has existed for 15 minutes in the cache - // then it should not be added back to the cache - if vmEntry.virtualMachine == nil && time.Since(vmEntry.lastUpdate) > 15*time.Minute { - klog.V(5).Infof("ignoring expired entries from old cache for %s", name) - continue - } - lastUpdate := time.Now().UTC() - if vmEntry.virtualMachine == nil { - // if this is already a nil entry then keep the time the nil - // entry was first created, so we can cleanup unwanted entries - lastUpdate = vmEntry.lastUpdate - } - - klog.V(5).Infof("adding old entries to new cache for %s", name) - localCache.Store(name, &vmssVirtualMachinesEntry{ - resourceGroup: vmEntry.resourceGroup, - vmssName: vmEntry.vmssName, - instanceID: vmEntry.instanceID, - virtualMachine: nil, - lastUpdate: lastUpdate, - }) - } + klog.V(5).Infof("adding old entries to new cache for %s", name) + localCache.Store(name, &vmssVirtualMachinesEntry{ + resourceGroup: vmEntry.resourceGroup, + vmssName: vmEntry.vmssName, + instanceID: vmEntry.instanceID, + virtualMachine: nil, + lastUpdate: lastUpdate, + }) } return localCache, nil @@ -214,14 +245,30 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { } func (ss *scaleSet) deleteCacheForNode(nodeName string) error { - cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeUnsafe) + node, err := ss.getNodeIdentityByNodeName(nodeName, azcache.CacheReadTypeUnsafe) if err != nil { klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) return err } - virtualMachines := cached.(*sync.Map) + cacheKey, timedcache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) + if err != nil { + klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) + return err + } + + vmcache, err := timedcache.Get(cacheKey, azcache.CacheReadTypeUnsafe) + if err != nil { + klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) + return err + } + virtualMachines := vmcache.(*sync.Map) virtualMachines.Delete(nodeName) + + if err := ss.gcVMSSVMCache(); err != nil { + klog.Errorf("deleteCacheForNode(%s) failed to gc stale vmss caches: %v", nodeName, err) + } + return nil }