diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go index 5115d8043ec..0d8468dfd7d 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_controller_vmss.go @@ -86,8 +86,9 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod defer cancel() // Invalidate the cache right after updating - key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) - defer ss.vmssVMCache.Delete(key) + if err = ss.deleteCacheForNode(vmName); err != nil { + return err + } klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI) _, err = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "attach_disk") @@ -157,8 +158,9 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName defer cancel() // Invalidate the cache right after updating - key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) - defer ss.vmssVMCache.Delete(key) + if err = ss.deleteCacheForNode(vmName); err != nil { + return nil, err + } klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI) return ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "detach_disk") diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go index 040de1d88b0..8feac08103b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" @@ -60,10 +61,8 @@ type scaleSet struct { // (e.g. master nodes) may not belong to any scale sets. availabilitySet VMSet - vmssCache *timedCache - vmssVMCache *timedCache - nodeNameToScaleSetMappingCache *timedCache - availabilitySetNodesCache *timedCache + vmssVMCache *timedCache + availabilitySetNodesCache *timedCache } // newScaleSet creates a new scaleSet. @@ -74,22 +73,12 @@ func newScaleSet(az *Cloud) (VMSet, error) { availabilitySet: newAvailabilitySet(az), } - ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache() - if err != nil { - return nil, err - } - ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache() if err != nil { return nil, err } - ss.vmssCache, err = ss.newVmssCache() - if err != nil { - return nil, err - } - - ss.vmssVMCache, err = ss.newVmssVMCache() + ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache() if err != nil { return nil, err } @@ -99,39 +88,46 @@ func newScaleSet(az *Cloud) (VMSet, error) { // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. -func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) { - instanceID, err = getScaleSetVMInstanceID(nodeName) +func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { + getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) { + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + if err != nil { + return "", "", nil, err + } + + virtualMachines := cached.(*sync.Map) + if vm, ok := virtualMachines.Load(nodeName); ok { + result := vm.(*vmssVirtualMachinesEntry) + return result.vmssName, result.instanceID, result.virtualMachine, nil + } + + return "", "", nil, nil + } + + _, err := getScaleSetVMInstanceID(nodeName) if err != nil { - return ssName, instanceID, vm, err + return "", "", nil, err } - ssName, err = ss.getScaleSetNameByNodeName(nodeName) + vmssName, instanceID, vm, err := getter(nodeName) if err != nil { - return ssName, instanceID, vm, err + return "", "", nil, err + } + if vm != nil { + return vmssName, instanceID, vm, nil } - if ssName == "" { - return "", "", vm, cloudprovider.InstanceNotFound - } - - resourceGroup, err := ss.GetNodeResourceGroup(nodeName) + klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) + ss.vmssVMCache.Delete(vmssVirtualMachinesKey) + vmssName, instanceID, vm, err = getter(nodeName) if err != nil { - return "", "", vm, err + return "", "", nil, err } - klog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName) - key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID)) - cachedVM, err := ss.vmssVMCache.Get(key) - if err != nil { - return ssName, instanceID, vm, err + if vm == nil { + return "", "", nil, cloudprovider.InstanceNotFound } - - if cachedVM == nil { - klog.Errorf("Can't find node (%q) in any scale sets", nodeName) - return ssName, instanceID, vm, cloudprovider.InstanceNotFound - } - - return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil + return vmssName, instanceID, vm, nil } // GetPowerStatusByNodeName returns the power state of the specified node. @@ -158,20 +154,49 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. -func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) { - vmName := ss.makeVmssVMName(scaleSetName, instanceID) - key := buildVmssCacheKey(resourceGroup, vmName) - cachedVM, err := ss.vmssVMCache.Get(key) +func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) { + getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + if err != nil { + return nil, false, err + } + + virtualMachines := cached.(*sync.Map) + virtualMachines.Range(func(key, value interface{}) bool { + vmEntry := value.(*vmssVirtualMachinesEntry) + if strings.EqualFold(vmEntry.resourceGroup, resourceGroup) && + strings.EqualFold(vmEntry.vmssName, scaleSetName) && + strings.EqualFold(vmEntry.instanceID, instanceID) { + vm = vmEntry.virtualMachine + found = true + return false + } + + return true + }) + + return vm, found, nil + } + + vm, found, err := getter() if err != nil { - return vm, err + return nil, err + } + if found { + return vm, nil } - if cachedVM == nil { - klog.Errorf("couldn't find vmss virtual machine by scaleSetName (%s) and instanceID (%s)", scaleSetName, instanceID) - return vm, cloudprovider.InstanceNotFound + klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) + ss.vmssVMCache.Delete(vmssVirtualMachinesKey) + vm, found, err = getter() + if err != nil { + return nil, err + } + if !found { + return nil, cloudprovider.InstanceNotFound } - return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil + return vm, nil } // GetInstanceIDByNodeName gets the cloud provider ID by node name. @@ -463,9 +488,15 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) { return nil, err } - ssNames := make([]string, len(allScaleSets)) - for i := range allScaleSets { - ssNames[i] = *(allScaleSets[i].Name) + ssNames := make([]string, 0) + for _, vmss := range allScaleSets { + name := *vmss.Name + if vmss.Sku != nil && to.Int64(vmss.Sku.Capacity) == 0 { + klog.V(3).Infof("Capacity of VMSS %q is 0, skipping", name) + continue + } + + ssNames = append(ssNames, name) } return ssNames, nil @@ -500,7 +531,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { } nodeName := nodes[nx].Name - ssName, err := ss.getScaleSetNameByNodeName(nodeName) + ssName, _, _, err := ss.getVmssVM(nodeName) if err != nil { return nil, err } @@ -599,7 +630,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err return network.Interface{}, err } - primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm) + primaryInterfaceID, err := ss.getPrimaryInterfaceID(*vm) if err != nil { klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err) return network.Interface{}, err @@ -816,8 +847,9 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam } // Invalidate the cache since we would update it. - key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) - defer ss.vmssVMCache.Delete(key) + if err = ss.deleteCacheForNode(vmName); err != nil { + return err + } // Update vmssVM with backoff. ctx, cancel := getContextWithCancel() @@ -1094,8 +1126,9 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa } // Invalidate the cache since we would update it. - key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID)) - defer ss.vmssVMCache.Delete(key) + if err = ss.deleteCacheForNode(nodeName); err != nil { + return err + } // Update vmssVM with backoff. ctx, cancel := getContextWithCancel() diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go index c6fed0a8f80..398dfed0ec5 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache.go @@ -21,9 +21,12 @@ package azure import ( "fmt" "strings" + "sync" "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute" + "github.com/Azure/go-autorest/autorest/to" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" ) @@ -32,18 +35,19 @@ var ( vmssNameSeparator = "_" vmssCacheSeparator = "#" - nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey" - availabilitySetNodesKey = "k8sAvailabilitySetNodesKey" + vmssVirtualMachinesKey = "k8svmssVirtualMachinesKey" + availabilitySetNodesKey = "k8sAvailabilitySetNodesKey" - vmssCacheTTL = time.Minute - vmssVMCacheTTL = time.Minute - availabilitySetNodesCacheTTL = 5 * time.Minute - nodeNameToScaleSetMappingCacheTTL = 5 * time.Minute + availabilitySetNodesCacheTTL = 15 * time.Minute + vmssVirtualMachinesTTL = 10 * time.Minute ) -// nodeNameToScaleSetMapping maps nodeName to scaleSet name. -// The map is required because vmss nodeName is not equal to its vmName. -type nodeNameToScaleSetMapping map[string]string +type vmssVirtualMachinesEntry struct { + resourceGroup string + vmssName string + instanceID string + virtualMachine *compute.VirtualMachineScaleSetVM +} func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string { return fmt.Sprintf("%s%s%s", scaleSetName, vmssNameSeparator, instanceID) @@ -63,32 +67,9 @@ func extractVmssVMName(name string) (string, string, error) { return ssName, instanceID, nil } -// vmssCache only holds vmss from ss.ResourceGroup because nodes from other resourceGroups -// will be excluded from LB backends. -func (ss *scaleSet) newVmssCache() (*timedCache, error) { +func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { getter := func(key string) (interface{}, error) { - ctx, cancel := getContextWithCancel() - defer cancel() - result, err := ss.VirtualMachineScaleSetsClient.Get(ctx, ss.ResourceGroup, key) - exists, message, realErr := checkResourceExistsFromError(err) - if realErr != nil { - return nil, realErr - } - - if !exists { - klog.V(2).Infof("Virtual machine scale set %q not found with message: %q", key, message) - return nil, nil - } - - return &result, nil - } - - return newTimedcache(vmssCacheTTL, getter) -} - -func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) { - getter := func(key string) (interface{}, error) { - localCache := make(nodeNameToScaleSetMapping) + localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry allResourceGroups, err := ss.GetResourceGroups() if err != nil { @@ -107,14 +88,20 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) { return nil, err } - for _, vm := range vms { + for i := range vms { + vm := vms[i] if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil { klog.Warningf("failed to get computerName for vmssVM (%q)", ssName) continue } computerName := strings.ToLower(*vm.OsProfile.ComputerName) - localCache[computerName] = ssName + localCache.Store(computerName, &vmssVirtualMachinesEntry{ + resourceGroup: resourceGroup, + vmssName: ssName, + instanceID: to.String(vm.InstanceID), + virtualMachine: &vm, + }) } } } @@ -122,7 +109,18 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) { return localCache, nil } - return newTimedcache(nodeNameToScaleSetMappingCacheTTL, getter) + return newTimedcache(vmssVirtualMachinesTTL, getter) +} + +func (ss *scaleSet) deleteCacheForNode(nodeName string) error { + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + if err != nil { + return err + } + + virtualMachines := cached.(*sync.Map) + virtualMachines.Delete(nodeName) + return nil } func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { @@ -152,91 +150,6 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { return newTimedcache(availabilitySetNodesCacheTTL, getter) } -func buildVmssCacheKey(resourceGroup, name string) string { - // key is composed of # - return fmt.Sprintf("%s%s%s", strings.ToLower(resourceGroup), vmssCacheSeparator, name) -} - -func extractVmssCacheKey(key string) (string, string, error) { - // key is composed of # - keyItems := strings.Split(key, vmssCacheSeparator) - if len(keyItems) != 2 { - return "", "", fmt.Errorf("key %q is not in format '#'", key) - } - - resourceGroup := keyItems[0] - vmName := keyItems[1] - return resourceGroup, vmName, nil -} - -func (ss *scaleSet) newVmssVMCache() (*timedCache, error) { - getter := func(key string) (interface{}, error) { - // key is composed of # - resourceGroup, vmName, err := extractVmssCacheKey(key) - if err != nil { - return nil, err - } - - // vmName's format is 'scaleSetName_instanceID' - ssName, instanceID, err := extractVmssVMName(vmName) - if err != nil { - return nil, err - } - - // Not found, the VM doesn't belong to any known scale sets. - if ssName == "" { - return nil, nil - } - - ctx, cancel := getContextWithCancel() - defer cancel() - result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, resourceGroup, ssName, instanceID, compute.InstanceView) - exists, message, realErr := checkResourceExistsFromError(err) - if realErr != nil { - return nil, realErr - } - - if !exists { - klog.V(2).Infof("Virtual machine scale set VM %q not found with message: %q", key, message) - return nil, nil - } - - return &result, nil - } - - return newTimedcache(vmssVMCacheTTL, getter) -} - -func (ss *scaleSet) getScaleSetNameByNodeName(nodeName string) (string, error) { - getScaleSetName := func(nodeName string) (string, error) { - nodeNameMapping, err := ss.nodeNameToScaleSetMappingCache.Get(nodeNameToScaleSetMappingKey) - if err != nil { - return "", err - } - - realMapping := nodeNameMapping.(nodeNameToScaleSetMapping) - if ssName, ok := realMapping[nodeName]; ok { - return ssName, nil - } - - return "", nil - } - - ssName, err := getScaleSetName(nodeName) - if err != nil { - return "", err - } - - if ssName != "" { - return ssName, nil - } - - // ssName is still not found, it is likely that new Nodes are created. - // Force refresh the cache and try again. - ss.nodeNameToScaleSetMappingCache.Delete(nodeNameToScaleSetMappingKey) - return getScaleSetName(nodeName) -} - func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) { cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey) if err != nil { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go index bf70b4ddd4e..c6596d7bc64 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_vmss_cache_test.go @@ -19,8 +19,11 @@ limitations under the License. package azure import ( + "context" + "sync" "testing" + "github.com/Azure/go-autorest/autorest/to" "github.com/stretchr/testify/assert" ) @@ -67,3 +70,45 @@ func TestExtractVmssVMName(t *testing.T) { assert.Equal(t, c.expectedInstanceID, instanceID, c.description) } } + +func TestVMSSVMCache(t *testing.T) { + vmssName := "vmss" + vmList := []string{"vmssee6c2000000", "vmssee6c2000001", "vmssee6c2000002"} + ss, err := newTestScaleSet(vmssName, "", 0, vmList) + assert.NoError(t, err) + + // validate getting VMSS VM via cache. + virtualMachines, err := ss.VirtualMachineScaleSetVMsClient.List( + context.Background(), "rg", "vmss", "", "", "") + assert.NoError(t, err) + assert.Equal(t, 3, len(virtualMachines)) + for i := range virtualMachines { + vm := virtualMachines[i] + vmName := to.String(vm.OsProfile.ComputerName) + ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + assert.NoError(t, err) + assert.Equal(t, "vmss", ssName) + assert.Equal(t, to.String(vm.InstanceID), instanceID) + assert.Equal(t, &vm, realVM) + } + + // validate deleteCacheForNode(). + vm := virtualMachines[0] + vmName := to.String(vm.OsProfile.ComputerName) + err = ss.deleteCacheForNode(vmName) + assert.NoError(t, err) + + // the VM should be removed from cache after deleteCacheForNode(). + cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey) + assert.NoError(t, err) + cachedVirtualMachines := cached.(*sync.Map) + _, ok := cachedVirtualMachines.Load(vmName) + assert.Equal(t, false, ok) + + // the VM should be get back after another cache refresh. + ssName, instanceID, realVM, err := ss.getVmssVM(vmName) + assert.NoError(t, err) + assert.Equal(t, "vmss", ssName) + assert.Equal(t, to.String(vm.InstanceID), instanceID) + assert.Equal(t, &vm, realVM) +}