mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Use generic cache for vmss
This commit is contained in:
parent
f0e573d6d5
commit
4d5e7b7cfb
@ -244,7 +244,10 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if strings.EqualFold(vmTypeVMSS, az.Config.VMType) {
|
if strings.EqualFold(vmTypeVMSS, az.Config.VMType) {
|
||||||
az.vmSet = newScaleSet(&az)
|
az.vmSet, err = newScaleSet(&az)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
az.vmSet = newAvailabilitySet(&az)
|
az.vmSet = newAvailabilitySet(&az)
|
||||||
}
|
}
|
||||||
|
@ -23,8 +23,6 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||||
@ -33,7 +31,6 @@ import (
|
|||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
)
|
)
|
||||||
@ -45,26 +42,6 @@ var (
|
|||||||
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
|
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
|
||||||
)
|
)
|
||||||
|
|
||||||
// scaleSetVMInfo includes basic information of a virtual machine.
|
|
||||||
type scaleSetVMInfo struct {
|
|
||||||
// The ID of the machine.
|
|
||||||
ID string
|
|
||||||
// Instance ID of the machine (only for scale sets vm).
|
|
||||||
InstanceID string
|
|
||||||
// Node name of the machine.
|
|
||||||
NodeName string
|
|
||||||
// Set name of the machine.
|
|
||||||
ScaleSetName string
|
|
||||||
// The type of the machine.
|
|
||||||
Type string
|
|
||||||
// The region of the machine.
|
|
||||||
Region string
|
|
||||||
// Primary interface ID of the machine.
|
|
||||||
PrimaryInterfaceID string
|
|
||||||
// Fault domain of the machine.
|
|
||||||
FaultDomain string
|
|
||||||
}
|
|
||||||
|
|
||||||
// scaleSet implements VMSet interface for Azure scale set.
|
// scaleSet implements VMSet interface for Azure scale set.
|
||||||
type scaleSet struct {
|
type scaleSet struct {
|
||||||
*Cloud
|
*Cloud
|
||||||
@ -73,207 +50,123 @@ type scaleSet struct {
|
|||||||
// (e.g. master nodes) may not belong to any scale sets.
|
// (e.g. master nodes) may not belong to any scale sets.
|
||||||
availabilitySet VMSet
|
availabilitySet VMSet
|
||||||
|
|
||||||
cacheMutex sync.Mutex
|
vmssCache *timedCache
|
||||||
// A local cache of scale sets. The key is scale set name and the value is a
|
vmssVMCache *timedCache
|
||||||
// list of virtual machines belonging to the scale set.
|
nodeNameToScaleSetMappingCache *timedCache
|
||||||
cache map[string][]scaleSetVMInfo
|
availabilitySetNodesCache *timedCache
|
||||||
availabilitySetNodesCache sets.String
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newScaleSet creates a new scaleSet.
|
// newScaleSet creates a new scaleSet.
|
||||||
func newScaleSet(az *Cloud) VMSet {
|
func newScaleSet(az *Cloud) (VMSet, error) {
|
||||||
|
var err error
|
||||||
ss := &scaleSet{
|
ss := &scaleSet{
|
||||||
Cloud: az,
|
Cloud: az,
|
||||||
availabilitySet: newAvailabilitySet(az),
|
availabilitySet: newAvailabilitySet(az),
|
||||||
availabilitySetNodesCache: sets.NewString(),
|
|
||||||
cache: make(map[string][]scaleSetVMInfo),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go wait.Until(func() {
|
ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache()
|
||||||
ss.cacheMutex.Lock()
|
|
||||||
defer ss.cacheMutex.Unlock()
|
|
||||||
|
|
||||||
if err := ss.updateCache(); err != nil {
|
|
||||||
glog.Errorf("updateCache failed: %v", err)
|
|
||||||
}
|
|
||||||
}, 5*time.Minute, wait.NeverStop)
|
|
||||||
|
|
||||||
return ss
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateCache updates scale sets cache. It should be called within a lock.
|
|
||||||
func (ss *scaleSet) updateCache() error {
|
|
||||||
scaleSetNames, err := ss.listScaleSetsWithRetry()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
localCache := make(map[string][]scaleSetVMInfo)
|
ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
|
||||||
for _, scaleSetName := range scaleSetNames {
|
if err != nil {
|
||||||
if _, ok := localCache[scaleSetName]; !ok {
|
return nil, err
|
||||||
localCache[scaleSetName] = make([]scaleSetVMInfo, 0)
|
|
||||||
}
|
|
||||||
vms, err := ss.listScaleSetVMsWithRetry(scaleSetName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, vm := range vms {
|
|
||||||
nodeName := ""
|
|
||||||
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
|
|
||||||
nodeName = strings.ToLower(*vm.OsProfile.ComputerName)
|
|
||||||
}
|
|
||||||
|
|
||||||
vmSize := ""
|
|
||||||
if vm.Sku != nil && vm.Sku.Name != nil {
|
|
||||||
vmSize = *vm.Sku.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("getPrimaryInterfaceID for %s failed: %v", nodeName, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
faultDomain := ""
|
|
||||||
if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil {
|
|
||||||
faultDomain = strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain))
|
|
||||||
}
|
|
||||||
|
|
||||||
localCache[scaleSetName] = append(localCache[scaleSetName], scaleSetVMInfo{
|
|
||||||
ID: *vm.ID,
|
|
||||||
Type: vmSize,
|
|
||||||
NodeName: nodeName,
|
|
||||||
FaultDomain: faultDomain,
|
|
||||||
ScaleSetName: scaleSetName,
|
|
||||||
Region: *vm.Location,
|
|
||||||
InstanceID: *vm.InstanceID,
|
|
||||||
PrimaryInterfaceID: primaryInterfaceID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only update cache after all steps are success.
|
ss.vmssCache, err = ss.newVmssCache()
|
||||||
ss.cache = localCache
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
ss.vmssVMCache, err = ss.newVmssVMCache()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCachedVirtualMachine gets virtualMachine by nodeName from cache.
|
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
|
||||||
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
|
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
|
||||||
func (ss *scaleSet) getCachedVirtualMachine(nodeName string) (scaleSetVMInfo, error) {
|
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
|
||||||
ss.cacheMutex.Lock()
|
|
||||||
defer ss.cacheMutex.Unlock()
|
|
||||||
|
|
||||||
getVMFromCache := func(nodeName string) (scaleSetVMInfo, bool) {
|
|
||||||
glog.V(8).Infof("Getting scaleSetVMInfo for %q from cache %v", nodeName, ss.cache)
|
|
||||||
for scaleSetName := range ss.cache {
|
|
||||||
for _, vm := range ss.cache[scaleSetName] {
|
|
||||||
if vm.NodeName == nodeName {
|
|
||||||
return vm, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return scaleSetVMInfo{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
vm, found := getVMFromCache(nodeName)
|
|
||||||
if found {
|
|
||||||
return vm, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Known node not managed by scale sets.
|
// Known node not managed by scale sets.
|
||||||
if ss.availabilitySetNodesCache.Has(nodeName) {
|
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
|
||||||
glog.V(10).Infof("Found node %q in availabilitySetNodesCache", nodeName)
|
if err != nil {
|
||||||
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
|
return "", "", vm, err
|
||||||
|
}
|
||||||
|
if managedByAS {
|
||||||
|
glog.V(8).Infof("Found node %q in availabilitySetNodesCache", nodeName)
|
||||||
|
return "", "", vm, ErrorNotVmssInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cache and try again.
|
instanceID, err = getScaleSetVMInstanceID(nodeName)
|
||||||
glog.V(10).Infof("vmss cache before updateCache: %v", ss.cache)
|
if err != nil {
|
||||||
if err := ss.updateCache(); err != nil {
|
return ssName, instanceID, vm, err
|
||||||
glog.Errorf("updateCache failed with error: %v", err)
|
|
||||||
return scaleSetVMInfo{}, err
|
|
||||||
}
|
|
||||||
glog.V(10).Infof("vmss cache after updateCache: %v", ss.cache)
|
|
||||||
vm, found = getVMFromCache(nodeName)
|
|
||||||
if found {
|
|
||||||
return vm, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node still not found, assuming it is not managed by scale sets.
|
ssName, err = ss.getScaleSetNameByNodeName(nodeName)
|
||||||
glog.V(8).Infof("Node %q doesn't belong to any scale sets, adding it to availabilitySetNodesCache", nodeName)
|
if err != nil {
|
||||||
ss.availabilitySetNodesCache.Insert(nodeName)
|
return ssName, instanceID, vm, err
|
||||||
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
|
}
|
||||||
|
|
||||||
|
if ssName == "" {
|
||||||
|
return "", "", vm, cloudprovider.InstanceNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
|
||||||
|
cachedVM, err := ss.vmssVMCache.Get(ss.makeVmssVMName(ssName, instanceID))
|
||||||
|
if err != nil {
|
||||||
|
return ssName, instanceID, vm, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cachedVM == nil {
|
||||||
|
glog.Errorf("Can't find node (%q) in any scale sets", nodeName)
|
||||||
|
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
||||||
// The node must belong to one of scale sets.
|
// The node must belong to one of scale sets.
|
||||||
func (ss *scaleSet) getCachedVirtualMachineByInstanceID(scaleSetName, instanceID string) (scaleSetVMInfo, error) {
|
func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
|
||||||
ss.cacheMutex.Lock()
|
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
|
||||||
defer ss.cacheMutex.Unlock()
|
cachedVM, err := ss.vmssVMCache.Get(vmName)
|
||||||
|
if err != nil {
|
||||||
getVMByID := func(scaleSetName, instanceID string) (scaleSetVMInfo, bool) {
|
return vm, err
|
||||||
glog.V(8).Infof("Getting scaleSetVMInfo with scaleSetName: %q and instanceID %q from cache %v", scaleSetName, instanceID, ss.cache)
|
|
||||||
vms, ok := ss.cache[scaleSetName]
|
|
||||||
if !ok {
|
|
||||||
glog.V(4).Infof("scale set (%s) not found", scaleSetName)
|
|
||||||
return scaleSetVMInfo{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, vm := range vms {
|
|
||||||
if vm.InstanceID == instanceID {
|
|
||||||
glog.V(4).Infof("getCachedVirtualMachineByInstanceID gets vm (%s) by instanceID (%s) within scale set (%s)", vm.NodeName, instanceID, scaleSetName)
|
|
||||||
return vm, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infof("instanceID (%s) not found in scale set (%s)", instanceID, scaleSetName)
|
|
||||||
return scaleSetVMInfo{}, false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vm, found := getVMByID(scaleSetName, instanceID)
|
if cachedVM == nil {
|
||||||
if found {
|
glog.Errorf("cound't find vmss virtual machine by scaleSetName (%q) and instanceID (%q)", scaleSetName, instanceID)
|
||||||
return vm, nil
|
return vm, cloudprovider.InstanceNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cache and try again.
|
return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
|
||||||
if err := ss.updateCache(); err != nil {
|
|
||||||
glog.Errorf("updateCache failed with error: %v", err)
|
|
||||||
return scaleSetVMInfo{}, err
|
|
||||||
}
|
|
||||||
vm, found = getVMByID(scaleSetName, instanceID)
|
|
||||||
if found {
|
|
||||||
return vm, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
|
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
|
||||||
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
|
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
|
||||||
// not exist or is no longer running.
|
// not exist or is no longer running.
|
||||||
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
|
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
|
||||||
vm, err := ss.getCachedVirtualMachine(name)
|
_, _, vm, err := ss.getVmssVM(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == cloudprovider.InstanceNotFound {
|
if err == ErrorNotVmssInstance {
|
||||||
glog.V(4).Infof("GetInstanceIDByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
|
glog.V(4).Infof("GetInstanceIDByNodeName: node %q is managed by availability set", name)
|
||||||
|
// Retry with standard type because nodes are not managed by vmss.
|
||||||
// Retry with standard type because master nodes may not belong to any vmss.
|
|
||||||
// TODO: find a better way to identify the type of VM.
|
|
||||||
return ss.availabilitySet.GetInstanceIDByNodeName(name)
|
return ss.availabilitySet.GetInstanceIDByNodeName(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return vm.ID, nil
|
return *vm.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeNameByProviderID gets the node name by provider ID.
|
// GetNodeNameByProviderID gets the node name by provider ID.
|
||||||
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
|
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
|
||||||
// NodeName is not part of providerID for vmss instances.
|
// NodeName is not part of providerID for vmss instances.
|
||||||
scaleSetName, err := extractScaleSetNameByVMID(providerID)
|
scaleSetName, err := extractScaleSetNameByExternalID(providerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
|
glog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
|
||||||
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
||||||
@ -285,49 +178,60 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
|
|||||||
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
vm, err := ss.getCachedVirtualMachineByInstanceID(scaleSetName, instanceID)
|
vm, err := ss.getVmssVMByInstanceID(scaleSetName, instanceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return types.NodeName(vm.NodeName), nil
|
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
|
||||||
|
nodeName := strings.ToLower(*vm.OsProfile.ComputerName)
|
||||||
|
return types.NodeName(nodeName), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetInstanceTypeByNodeName gets the instance type by node name.
|
// GetInstanceTypeByNodeName gets the instance type by node name.
|
||||||
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
|
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
|
||||||
vm, err := ss.getCachedVirtualMachine(name)
|
_, _, vm, err := ss.getVmssVM(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == cloudprovider.InstanceNotFound {
|
if err == ErrorNotVmssInstance {
|
||||||
glog.V(4).Infof("GetInstanceTypeByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
|
glog.V(4).Infof("GetInstanceTypeByNodeName: node %q is managed by availability set", name)
|
||||||
|
// Retry with standard type because nodes are not managed by vmss.
|
||||||
// Retry with standard type because master nodes may not belong to any vmss.
|
|
||||||
// TODO: find a better way to identify the type of VM.
|
|
||||||
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
|
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return vm.Type, nil
|
if vm.Sku != nil && vm.Sku.Name != nil {
|
||||||
|
return *vm.Sku.Name, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetZoneByNodeName gets cloudprovider.Zone by node name.
|
// GetZoneByNodeName gets cloudprovider.Zone by node name.
|
||||||
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||||
vm, err := ss.getCachedVirtualMachine(name)
|
_, _, vm, err := ss.getVmssVM(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == cloudprovider.InstanceNotFound {
|
if err == ErrorNotVmssInstance {
|
||||||
glog.V(4).Infof("GetZoneByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
|
glog.V(4).Infof("GetZoneByNodeName: node %q is managed by availability set", name)
|
||||||
// Retry with standard type because master nodes may not belong to any vmss.
|
// Retry with standard type because nodes are not managed by vmss.
|
||||||
// TODO: find a better way to identify the type of VM.
|
|
||||||
return ss.availabilitySet.GetZoneByNodeName(name)
|
return ss.availabilitySet.GetZoneByNodeName(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cloudprovider.Zone{
|
if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil {
|
||||||
FailureDomain: vm.FaultDomain,
|
return cloudprovider.Zone{
|
||||||
Region: vm.Region,
|
FailureDomain: strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain)),
|
||||||
}, nil
|
Region: *vm.Location,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return cloudprovider.Zone{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
|
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
|
||||||
@ -386,9 +290,9 @@ func getScaleSetVMInstanceID(machineName string) (string, error) {
|
|||||||
return fmt.Sprintf("%d", instanceID), nil
|
return fmt.Sprintf("%d", instanceID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractScaleSetNameByVMID extracts the scaleset name by scaleSetVirtualMachine's ID.
|
// extractScaleSetNameByExternalID extracts the scaleset name by node's externalID.
|
||||||
func extractScaleSetNameByVMID(vmID string) (string, error) {
|
func extractScaleSetNameByExternalID(externalID string) (string, error) {
|
||||||
matches := scaleSetNameRE.FindStringSubmatch(vmID)
|
matches := scaleSetNameRE.FindStringSubmatch(externalID)
|
||||||
if len(matches) != 2 {
|
if len(matches) != 2 {
|
||||||
return "", ErrorNotVmssInstance
|
return "", ErrorNotVmssInstance
|
||||||
}
|
}
|
||||||
@ -493,41 +397,20 @@ func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.Vir
|
|||||||
// getAgentPoolScaleSets lists the virtual machines for for the resource group and then builds
|
// getAgentPoolScaleSets lists the virtual machines for for the resource group and then builds
|
||||||
// a list of scale sets that match the nodes available to k8s.
|
// a list of scale sets that match the nodes available to k8s.
|
||||||
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
|
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
|
||||||
ss.cacheMutex.Lock()
|
|
||||||
defer ss.cacheMutex.Unlock()
|
|
||||||
|
|
||||||
// Always update cache to get latest lists of scale sets and virtual machines.
|
|
||||||
if err := ss.updateCache(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
vmNameToScaleSetName := make(map[string]string)
|
|
||||||
for scaleSetName := range ss.cache {
|
|
||||||
vms := ss.cache[scaleSetName]
|
|
||||||
for idx := range vms {
|
|
||||||
vm := vms[idx]
|
|
||||||
if vm.NodeName != "" {
|
|
||||||
vmNameToScaleSetName[vm.NodeName] = scaleSetName
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
agentPoolScaleSets := &[]string{}
|
agentPoolScaleSets := &[]string{}
|
||||||
availableScaleSetNames := sets.NewString()
|
|
||||||
for nx := range nodes {
|
for nx := range nodes {
|
||||||
if isMasterNode(nodes[nx]) {
|
if isMasterNode(nodes[nx]) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeName := nodes[nx].Name
|
nodeName := nodes[nx].Name
|
||||||
ssName, ok := vmNameToScaleSetName[nodeName]
|
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
|
||||||
if !ok {
|
if err != nil {
|
||||||
// TODO: support master nodes not managed by VMSS.
|
return nil, err
|
||||||
glog.Errorf("Node %q is not belonging to any known scale sets", nodeName)
|
|
||||||
return nil, fmt.Errorf("node %q is not belonging to any known scale sets", nodeName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if availableScaleSetNames.Has(ssName) {
|
if ssName == "" {
|
||||||
|
glog.V(3).Infof("Node %q is not belonging to any known scale sets", nodeName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -589,11 +472,11 @@ func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetN
|
|||||||
|
|
||||||
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
||||||
func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
|
func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
|
||||||
vm, err := ss.getCachedVirtualMachine(nodeName)
|
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == cloudprovider.InstanceNotFound {
|
if err == ErrorNotVmssInstance {
|
||||||
// Retry with standard type because master nodes may not belong to any vmss.
|
glog.V(4).Infof("GetPrimaryInterface: node %q is managed by availability set", nodeName)
|
||||||
// TODO: find a better way to identify the type of VM.
|
// Retry with standard type because nodes are not managed by vmss.
|
||||||
return ss.availabilitySet.GetPrimaryInterface(nodeName, "")
|
return ss.availabilitySet.GetPrimaryInterface(nodeName, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -602,59 +485,55 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check scale set name.
|
// Check scale set name.
|
||||||
if vmSetName != "" && !strings.EqualFold(vm.ScaleSetName, vmSetName) {
|
if vmSetName != "" && !strings.EqualFold(ssName, vmSetName) {
|
||||||
return network.Interface{}, errNotInVMSet
|
return network.Interface{}, errNotInVMSet
|
||||||
}
|
}
|
||||||
|
|
||||||
nicName, err := getLastSegment(vm.PrimaryInterfaceID)
|
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, vm.PrimaryInterfaceID, err)
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err)
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, vm.ScaleSetName, vm.InstanceID, nicName, "")
|
nicName, err := getLastSegment(primaryInterfaceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, vm.ScaleSetName, nicName, err)
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err)
|
||||||
|
return network.Interface{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, ssName, instanceID, nicName, "")
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, ssName, nicName, err)
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fix interface's location, which is required when updating the interface.
|
// Fix interface's location, which is required when updating the interface.
|
||||||
// TODO: is this a bug of azure SDK?
|
// TODO: is this a bug of azure SDK?
|
||||||
if nic.Location == nil || *nic.Location == "" {
|
if nic.Location == nil || *nic.Location == "" {
|
||||||
nic.Location = &vm.Region
|
nic.Location = vm.Location
|
||||||
}
|
}
|
||||||
|
|
||||||
return nic, nil
|
return nic, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getScaleSet gets a scale set by name.
|
|
||||||
func (ss *scaleSet) getScaleSet(name string) (compute.VirtualMachineScaleSet, bool, error) {
|
|
||||||
result, err := ss.VirtualMachineScaleSetsClient.Get(ss.ResourceGroup, name)
|
|
||||||
exists, realErr := checkResourceExistsFromError(err)
|
|
||||||
if realErr != nil {
|
|
||||||
return result, false, realErr
|
|
||||||
}
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return result, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, exists, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// getScaleSetWithRetry gets scale set with exponential backoff retry
|
// getScaleSetWithRetry gets scale set with exponential backoff retry
|
||||||
func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) {
|
func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) {
|
||||||
var result compute.VirtualMachineScaleSet
|
var result compute.VirtualMachineScaleSet
|
||||||
var exists bool
|
var exists bool
|
||||||
|
|
||||||
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||||
var retryErr error
|
cached, retryErr := ss.vmssCache.Get(name)
|
||||||
result, exists, retryErr = ss.getScaleSet(name)
|
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
glog.Errorf("backoff: failure, will retry,err=%v", retryErr)
|
glog.Errorf("backoff: failure for scale set %q, will retry,err=%v", name, retryErr)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
glog.V(2).Info("backoff: success")
|
glog.V(4).Info("backoff: success for scale set %q", name)
|
||||||
|
|
||||||
|
if cached != nil {
|
||||||
|
exists = true
|
||||||
|
result = *(cached.(*compute.VirtualMachineScaleSet))
|
||||||
|
}
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -783,7 +662,7 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
|
|||||||
// Construct instanceIDs from nodes.
|
// Construct instanceIDs from nodes.
|
||||||
instanceIDs := []string{}
|
instanceIDs := []string{}
|
||||||
for _, curNode := range nodes {
|
for _, curNode := range nodes {
|
||||||
curScaleSetName, err := extractScaleSetNameByVMID(curNode.Spec.ExternalID)
|
curScaleSetName, err := extractScaleSetNameByExternalID(curNode.Spec.ExternalID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("Node %q is not belonging to any scale sets, omitting it", curNode.Name)
|
glog.V(4).Infof("Node %q is not belonging to any scale sets, omitting it", curNode.Name)
|
||||||
continue
|
continue
|
||||||
|
191
pkg/cloudprovider/providers/azure/azure_vmss_cache.go
Normal file
191
pkg/cloudprovider/providers/azure/azure_vmss_cache.go
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package azure
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey"
|
||||||
|
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
|
||||||
|
|
||||||
|
vmssCacheTTL = time.Minute
|
||||||
|
vmssVMCacheTTL = time.Minute
|
||||||
|
availabilitySetNodesCacheTTL = 15 * time.Minute
|
||||||
|
nodeNameToScaleSetMappingCacheTTL = 15 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// nodeNameToScaleSetMapping maps nodeName to scaleSet name.
|
||||||
|
// The map is required because vmss nodeName is not equal to its vmName.
|
||||||
|
type nodeNameToScaleSetMapping map[string]string
|
||||||
|
|
||||||
|
func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string {
|
||||||
|
return fmt.Sprintf("%s_%s", scaleSetName, instanceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) extractVmssVMName(name string) (string, string, error) {
|
||||||
|
ret := strings.Split(name, "_")
|
||||||
|
if len(ret) != 2 {
|
||||||
|
glog.Errorf("Failed to extract vmssVMName %q", name)
|
||||||
|
return "", "", ErrorNotVmssInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret[0], ret[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) newVmssCache() (*timedCache, error) {
|
||||||
|
getter := func(key string) (interface{}, error) {
|
||||||
|
result, err := ss.VirtualMachineScaleSetsClient.Get(ss.ResourceGroup, key)
|
||||||
|
exists, realErr := checkResourceExistsFromError(err)
|
||||||
|
if realErr != nil {
|
||||||
|
return nil, realErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTimedcache(vmssCacheTTL, getter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
|
||||||
|
getter := func(key string) (interface{}, error) {
|
||||||
|
scaleSetNames, err := ss.listScaleSetsWithRetry()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
localCache := make(nodeNameToScaleSetMapping)
|
||||||
|
for _, ssName := range scaleSetNames {
|
||||||
|
vms, err := ss.listScaleSetVMsWithRetry(ssName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, vm := range vms {
|
||||||
|
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
|
||||||
|
glog.Warningf("failed to get computerName for vmssVM (%q)", *vm.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
|
||||||
|
localCache[computerName] = ssName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return localCache, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTimedcache(nodeNameToScaleSetMappingCacheTTL, getter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
|
||||||
|
getter := func(key string) (interface{}, error) {
|
||||||
|
vmList, err := ss.Cloud.VirtualMachineClientListWithRetry()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
localCache := sets.NewString()
|
||||||
|
for _, vm := range vmList {
|
||||||
|
localCache.Insert(*vm.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return localCache, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTimedcache(availabilitySetNodesCacheTTL, getter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
|
||||||
|
getter := func(key string) (interface{}, error) {
|
||||||
|
// vmssVM name's format is 'scaleSetName_instanceID'
|
||||||
|
ssName, instanceID, err := ss.extractVmssVMName(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not found, the VM doesn't belong to any known scale sets.
|
||||||
|
if ssName == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ssName, instanceID)
|
||||||
|
exists, realErr := checkResourceExistsFromError(err)
|
||||||
|
if realErr != nil {
|
||||||
|
return nil, realErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTimedcache(vmssVMCacheTTL, getter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) getScaleSetNameByNodeName(nodeName string) (string, error) {
|
||||||
|
getScaleSetName := func(nodeName string) (string, error) {
|
||||||
|
nodeNameMapping, err := ss.nodeNameToScaleSetMappingCache.Get(nodeNameToScaleSetMappingKey)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
realMapping := nodeNameMapping.(nodeNameToScaleSetMapping)
|
||||||
|
if ssName, ok := realMapping[nodeName]; ok {
|
||||||
|
return ssName, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ssName, err := getScaleSetName(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ssName != "" {
|
||||||
|
return ssName, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ssName is still not found, it is likely that new Nodes are created.
|
||||||
|
// Force refresh the cache and try again.
|
||||||
|
ss.nodeNameToScaleSetMappingCache.Delete(nodeNameToScaleSetMappingKey)
|
||||||
|
return getScaleSetName(nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string) (bool, error) {
|
||||||
|
cached, err := ss.availabilitySetNodesCache.Get(availabilitySetNodesKey)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
availabilitySetNodes := cached.(sets.String)
|
||||||
|
return availabilitySetNodes.Has(nodeName), nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user