Fix aggressive VM calls for Azure VMSS

This commit is contained in:
Pengfei Ni 2019-09-25 07:59:17 +00:00
parent 354a812086
commit 197892d2db
4 changed files with 175 additions and 182 deletions

View File

@ -86,8 +86,9 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
defer cancel()
// Invalidate the cache right after updating
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return err
}
klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
_, err = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "attach_disk")
@ -157,8 +158,9 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
defer cancel()
// Invalidate the cache right after updating
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return nil, err
}
klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
return ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "detach_disk")

View File

@ -25,6 +25,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
@ -60,10 +61,8 @@ type scaleSet struct {
// (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet
vmssCache *timedCache
vmssVMCache *timedCache
nodeNameToScaleSetMappingCache *timedCache
availabilitySetNodesCache *timedCache
vmssVMCache *timedCache
availabilitySetNodesCache *timedCache
}
// newScaleSet creates a new scaleSet.
@ -74,22 +73,12 @@ func newScaleSet(az *Cloud) (VMSet, error) {
availabilitySet: newAvailabilitySet(az),
}
ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache()
if err != nil {
return nil, err
}
ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
if err != nil {
return nil, err
}
ss.vmssCache, err = ss.newVmssCache()
if err != nil {
return nil, err
}
ss.vmssVMCache, err = ss.newVmssVMCache()
ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache()
if err != nil {
return nil, err
}
@ -99,39 +88,46 @@ func newScaleSet(az *Cloud) (VMSet, error) {
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
instanceID, err = getScaleSetVMInstanceID(nodeName)
func (ss *scaleSet) getVmssVM(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
getter := func(nodeName string) (string, string, *compute.VirtualMachineScaleSetVM, error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
if err != nil {
return "", "", nil, err
}
virtualMachines := cached.(*sync.Map)
if vm, ok := virtualMachines.Load(nodeName); ok {
result := vm.(*vmssVirtualMachinesEntry)
return result.vmssName, result.instanceID, result.virtualMachine, nil
}
return "", "", nil, nil
}
_, err := getScaleSetVMInstanceID(nodeName)
if err != nil {
return ssName, instanceID, vm, err
return "", "", nil, err
}
ssName, err = ss.getScaleSetNameByNodeName(nodeName)
vmssName, instanceID, vm, err := getter(nodeName)
if err != nil {
return ssName, instanceID, vm, err
return "", "", nil, err
}
if vm != nil {
return vmssName, instanceID, vm, nil
}
if ssName == "" {
return "", "", vm, cloudprovider.InstanceNotFound
}
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
klog.V(3).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName)
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
vmssName, instanceID, vm, err = getter(nodeName)
if err != nil {
return "", "", vm, err
return "", "", nil, err
}
klog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID))
cachedVM, err := ss.vmssVMCache.Get(key)
if err != nil {
return ssName, instanceID, vm, err
if vm == nil {
return "", "", nil, cloudprovider.InstanceNotFound
}
if cachedVM == nil {
klog.Errorf("Can't find node (%q) in any scale sets", nodeName)
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
}
return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
return vmssName, instanceID, vm, nil
}
// GetPowerStatusByNodeName returns the power state of the specified node.
@ -158,20 +154,49 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
key := buildVmssCacheKey(resourceGroup, vmName)
cachedVM, err := ss.vmssVMCache.Get(key)
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (*compute.VirtualMachineScaleSetVM, error) {
getter := func() (vm *compute.VirtualMachineScaleSetVM, found bool, err error) {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
if err != nil {
return nil, false, err
}
virtualMachines := cached.(*sync.Map)
virtualMachines.Range(func(key, value interface{}) bool {
vmEntry := value.(*vmssVirtualMachinesEntry)
if strings.EqualFold(vmEntry.resourceGroup, resourceGroup) &&
strings.EqualFold(vmEntry.vmssName, scaleSetName) &&
strings.EqualFold(vmEntry.instanceID, instanceID) {
vm = vmEntry.virtualMachine
found = true
return false
}
return true
})
return vm, found, nil
}
vm, found, err := getter()
if err != nil {
return vm, err
return nil, err
}
if found {
return vm, nil
}
if cachedVM == nil {
klog.Errorf("couldn't find vmss virtual machine by scaleSetName (%s) and instanceID (%s)", scaleSetName, instanceID)
return vm, cloudprovider.InstanceNotFound
klog.V(3).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID)
ss.vmssVMCache.Delete(vmssVirtualMachinesKey)
vm, found, err = getter()
if err != nil {
return nil, err
}
if !found {
return nil, cloudprovider.InstanceNotFound
}
return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
return vm, nil
}
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
@ -463,9 +488,15 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
return nil, err
}
ssNames := make([]string, len(allScaleSets))
for i := range allScaleSets {
ssNames[i] = *(allScaleSets[i].Name)
ssNames := make([]string, 0)
for _, vmss := range allScaleSets {
name := *vmss.Name
if vmss.Sku != nil && to.Int64(vmss.Sku.Capacity) == 0 {
klog.V(3).Infof("Capacity of VMSS %q is 0, skipping", name)
continue
}
ssNames = append(ssNames, name)
}
return ssNames, nil
@ -500,7 +531,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
}
nodeName := nodes[nx].Name
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
ssName, _, _, err := ss.getVmssVM(nodeName)
if err != nil {
return nil, err
}
@ -599,7 +630,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
return network.Interface{}, err
}
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
primaryInterfaceID, err := ss.getPrimaryInterfaceID(*vm)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err)
return network.Interface{}, err
@ -816,8 +847,9 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
}
// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(vmName); err != nil {
return err
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()
@ -1094,8 +1126,9 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
}
// Invalidate the cache since we would update it.
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
defer ss.vmssVMCache.Delete(key)
if err = ss.deleteCacheForNode(nodeName); err != nil {
return err
}
// Update vmssVM with backoff.
ctx, cancel := getContextWithCancel()

View File

@ -21,9 +21,12 @@ package azure
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
)
@ -32,18 +35,19 @@ var (
vmssNameSeparator = "_"
vmssCacheSeparator = "#"
nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey"
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
vmssVirtualMachinesKey = "k8svmssVirtualMachinesKey"
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
vmssCacheTTL = time.Minute
vmssVMCacheTTL = time.Minute
availabilitySetNodesCacheTTL = 5 * time.Minute
nodeNameToScaleSetMappingCacheTTL = 5 * time.Minute
availabilitySetNodesCacheTTL = 15 * time.Minute
vmssVirtualMachinesTTL = 10 * time.Minute
)
// nodeNameToScaleSetMapping maps nodeName to scaleSet name.
// The map is required because vmss nodeName is not equal to its vmName.
type nodeNameToScaleSetMapping map[string]string
type vmssVirtualMachinesEntry struct {
resourceGroup string
vmssName string
instanceID string
virtualMachine *compute.VirtualMachineScaleSetVM
}
func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string {
return fmt.Sprintf("%s%s%s", scaleSetName, vmssNameSeparator, instanceID)
@ -63,32 +67,9 @@ func extractVmssVMName(name string) (string, string, error) {
return ssName, instanceID, nil
}
// vmssCache only holds vmss from ss.ResourceGroup because nodes from other resourceGroups
// will be excluded from LB backends.
func (ss *scaleSet) newVmssCache() (*timedCache, error) {
func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := ss.VirtualMachineScaleSetsClient.Get(ctx, ss.ResourceGroup, key)
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
}
if !exists {
klog.V(2).Infof("Virtual machine scale set %q not found with message: %q", key, message)
return nil, nil
}
return &result, nil
}
return newTimedcache(vmssCacheTTL, getter)
}
func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := make(nodeNameToScaleSetMapping)
localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry
allResourceGroups, err := ss.GetResourceGroups()
if err != nil {
@ -107,14 +88,20 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
return nil, err
}
for _, vm := range vms {
for i := range vms {
vm := vms[i]
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
klog.Warningf("failed to get computerName for vmssVM (%q)", ssName)
continue
}
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
localCache[computerName] = ssName
localCache.Store(computerName, &vmssVirtualMachinesEntry{
resourceGroup: resourceGroup,
vmssName: ssName,
instanceID: to.String(vm.InstanceID),
virtualMachine: &vm,
})
}
}
}
@ -122,7 +109,18 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
return localCache, nil
}
return newTimedcache(nodeNameToScaleSetMappingCacheTTL, getter)
return newTimedcache(vmssVirtualMachinesTTL, getter)
}
func (ss *scaleSet) deleteCacheForNode(nodeName string) error {
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
if err != nil {
return err
}
virtualMachines := cached.(*sync.Map)
virtualMachines.Delete(nodeName)
return nil
}
func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
@ -152,91 +150,6 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
return newTimedcache(availabilitySetNodesCacheTTL, getter)
}
func buildVmssCacheKey(resourceGroup, name string) string {
// key is composed of <resourceGroup>#<vmName>
return fmt.Sprintf("%s%s%s", strings.ToLower(resourceGroup), vmssCacheSeparator, name)
}
func extractVmssCacheKey(key string) (string, string, error) {
// key is composed of <resourceGroup>#<vmName>
keyItems := strings.Split(key, vmssCacheSeparator)
if len(keyItems) != 2 {
return "", "", fmt.Errorf("key %q is not in format '<resourceGroup>#<vmName>'", key)
}
resourceGroup := keyItems[0]
vmName := keyItems[1]
return resourceGroup, vmName, nil
}
func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
// key is composed of <resourceGroup>#<vmName>
resourceGroup, vmName, err := extractVmssCacheKey(key)
if err != nil {
return nil, err
}
// vmName's format is 'scaleSetName_instanceID'
ssName, instanceID, err := extractVmssVMName(vmName)
if err != nil {
return nil, err
}
// Not found, the VM doesn't belong to any known scale sets.
if ssName == "" {
return nil, nil
}
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, resourceGroup, ssName, instanceID, compute.InstanceView)
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
}
if !exists {
klog.V(2).Infof("Virtual machine scale set VM %q not found with message: %q", key, message)
return nil, nil
}
return &result, nil
}
return newTimedcache(vmssVMCacheTTL, getter)
}
func (ss *scaleSet) getScaleSetNameByNodeName(nodeName string) (string, error) {
getScaleSetName := func(nodeName string) (string, error) {
nodeNameMapping, err := ss.nodeNameToScaleSetMappingCache.Get(nodeNameToScaleSetMappingKey)
if err != nil {
return "", err
}
realMapping := nodeNameMapping.(nodeNameToScaleSetMapping)
if ssName, ok := realMapping[nodeName]; ok {
return ssName, nil
}
return "", nil
}
ssName, err := getScaleSetName(nodeName)
if err != nil {
return "", err
}
if ssName != "" {
return ssName, nil
}
// ssName is still not found, it is likely that new Nodes are created.
// Force refresh the cache and try again.
ss.nodeNameToScaleSetMappingCache.Delete(nodeNameToScaleSetMappingKey)
return getScaleSetName(nodeName)
}
func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) {
cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey)
if err != nil {

View File

@ -19,8 +19,11 @@ limitations under the License.
package azure
import (
"context"
"sync"
"testing"
"github.com/Azure/go-autorest/autorest/to"
"github.com/stretchr/testify/assert"
)
@ -67,3 +70,45 @@ func TestExtractVmssVMName(t *testing.T) {
assert.Equal(t, c.expectedInstanceID, instanceID, c.description)
}
}
func TestVMSSVMCache(t *testing.T) {
vmssName := "vmss"
vmList := []string{"vmssee6c2000000", "vmssee6c2000001", "vmssee6c2000002"}
ss, err := newTestScaleSet(vmssName, "", 0, vmList)
assert.NoError(t, err)
// validate getting VMSS VM via cache.
virtualMachines, err := ss.VirtualMachineScaleSetVMsClient.List(
context.Background(), "rg", "vmss", "", "", "")
assert.NoError(t, err)
assert.Equal(t, 3, len(virtualMachines))
for i := range virtualMachines {
vm := virtualMachines[i]
vmName := to.String(vm.OsProfile.ComputerName)
ssName, instanceID, realVM, err := ss.getVmssVM(vmName)
assert.NoError(t, err)
assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID)
assert.Equal(t, &vm, realVM)
}
// validate deleteCacheForNode().
vm := virtualMachines[0]
vmName := to.String(vm.OsProfile.ComputerName)
err = ss.deleteCacheForNode(vmName)
assert.NoError(t, err)
// the VM should be removed from cache after deleteCacheForNode().
cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey)
assert.NoError(t, err)
cachedVirtualMachines := cached.(*sync.Map)
_, ok := cachedVirtualMachines.Load(vmName)
assert.Equal(t, false, ok)
// the VM should be get back after another cache refresh.
ssName, instanceID, realVM, err := ss.getVmssVM(vmName)
assert.NoError(t, err)
assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID)
assert.Equal(t, &vm, realVM)
}