mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-18 08:09:58 +00:00
Merge pull request #57131 from feiskyer/azure-lb
Automatic merge from submit-queue (batch tested with PRs 57148, 57123, 57091, 57141, 57131). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Support LoadBalancer for Azure Virtual Machine Scale Sets **What this PR does / why we need it**: Continue of #43287, this PR adds LoadBalancer support for Azure Virtual Machine Scale Sets. To achieve this, this PR also - Add a general VMSet interfaces for VMSS and VMAS, so that we won't add much if-else blocks for different logics - Add scale sets implementation and availability sets implementation to VMSet - Add vmSet property to Azure cloud provider and call vmSet instead of direct azure clients - Add LoadBalancer support based vmSet **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Part of #43287. **Special notes for your reviewer**: **Release note**: ```release-note Support LoadBalancer for Azure Virtual Machine Scale Sets ``` /assign @brendandburns
This commit is contained in:
commit
65a2367188
@ -24,6 +24,7 @@ go_library(
|
||||
"azure_storageaccount.go",
|
||||
"azure_util.go",
|
||||
"azure_util_vmss.go",
|
||||
"azure_vmsets.go",
|
||||
"azure_wrap.go",
|
||||
"azure_zones.go",
|
||||
],
|
||||
|
@ -198,6 +198,7 @@ type Cloud struct {
|
||||
operationPollRateLimiter flowcontrol.RateLimiter
|
||||
resourceRequestBackoff wait.Backoff
|
||||
metadata *InstanceMetadata
|
||||
vmSet VMSet
|
||||
|
||||
// Clients for vmss.
|
||||
VirtualMachineScaleSetsClient compute.VirtualMachineScaleSetsClient
|
||||
@ -346,16 +347,16 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
az.SecurityGroupsClient = securityGroupsClient
|
||||
|
||||
virtualMachineScaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(az.SubscriptionID)
|
||||
az.VirtualMachineScaleSetVMsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.VirtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.VirtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second
|
||||
virtualMachineScaleSetVMsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
virtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
virtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&virtualMachineScaleSetVMsClient.Client)
|
||||
az.VirtualMachineScaleSetVMsClient = virtualMachineScaleSetVMsClient
|
||||
|
||||
virtualMachineScaleSetsClient := compute.NewVirtualMachineScaleSetsClient(az.SubscriptionID)
|
||||
az.VirtualMachineScaleSetsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
az.VirtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
az.VirtualMachineScaleSetsClient.PollingDelay = 5 * time.Second
|
||||
virtualMachineScaleSetsClient.BaseURI = az.Environment.ResourceManagerEndpoint
|
||||
virtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken)
|
||||
virtualMachineScaleSetsClient.PollingDelay = 5 * time.Second
|
||||
configureUserAgent(&virtualMachineScaleSetsClient.Client)
|
||||
az.VirtualMachineScaleSetsClient = virtualMachineScaleSetsClient
|
||||
|
||||
@ -421,6 +422,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
||||
az.MaximumLoadBalancerRuleCount = maximumLoadBalancerRuleCount
|
||||
}
|
||||
|
||||
if az.Config.VMType == vmTypeVMSS {
|
||||
az.vmSet = newScaleSet(&az)
|
||||
} else {
|
||||
az.vmSet = newAvailabilitySet(&az)
|
||||
}
|
||||
|
||||
if err := initDiskControllers(&az); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -58,13 +58,13 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua
|
||||
return machine, exists, err
|
||||
}
|
||||
|
||||
// GetScaleSetsVMWithRetry invokes az.getScaleSetsVM with exponential backoff retry
|
||||
func (az *Cloud) GetScaleSetsVMWithRetry(name types.NodeName) (compute.VirtualMachineScaleSetVM, bool, error) {
|
||||
// GetScaleSetsVMWithRetry invokes ss.getScaleSetVM with exponential backoff retry
|
||||
func (ss *scaleSet) GetScaleSetsVMWithRetry(name types.NodeName, scaleSetName string) (compute.VirtualMachineScaleSetVM, bool, error) {
|
||||
var machine compute.VirtualMachineScaleSetVM
|
||||
var exists bool
|
||||
err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) {
|
||||
err := wait.ExponentialBackoff(ss.resourceRequestBackoff, func() (bool, error) {
|
||||
var retryErr error
|
||||
machine, exists, retryErr = az.getVmssVirtualMachine(name)
|
||||
machine, exists, retryErr = ss.getScaleSetVM(string(name), scaleSetName)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("GetScaleSetsVMWithRetry backoff: failure, will retry,err=%v", retryErr)
|
||||
return false, nil
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
@ -48,6 +47,7 @@ func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
|
||||
}
|
||||
return addresses, nil
|
||||
}
|
||||
|
||||
ip, err := az.GetIPForMachineWithRetry(name)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("NodeAddresses(%s) abort backoff", name)
|
||||
@ -64,7 +64,7 @@ func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
|
||||
// This method will not be called from the node that is requesting this ID. i.e. metadata service
|
||||
// and other local methods cannot be used here
|
||||
func (az *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
|
||||
name, err := splitProviderID(providerID)
|
||||
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -80,7 +80,7 @@ func (az *Cloud) ExternalID(name types.NodeName) (string, error) {
|
||||
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
|
||||
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
|
||||
func (az *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
|
||||
name, err := splitProviderID(providerID)
|
||||
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -118,70 +118,14 @@ func (az *Cloud) InstanceID(name types.NodeName) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if az.Config.VMType == vmTypeVMSS {
|
||||
id, err := az.getVmssInstanceID(name)
|
||||
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
return az.getStandardInstanceID(name)
|
||||
}
|
||||
|
||||
return id, err
|
||||
}
|
||||
|
||||
return az.getStandardInstanceID(name)
|
||||
}
|
||||
|
||||
func (az *Cloud) getVmssInstanceID(name types.NodeName) (string, error) {
|
||||
var machine compute.VirtualMachineScaleSetVM
|
||||
var exists bool
|
||||
var err error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
machine, exists, err = az.getVmssVirtualMachine(name)
|
||||
if err != nil {
|
||||
if az.CloudProviderBackoff {
|
||||
glog.V(2).Infof("InstanceID(%s) backing off", name)
|
||||
machine, exists, err = az.GetScaleSetsVMWithRetry(name)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
return *machine.ID, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) getStandardInstanceID(name types.NodeName) (string, error) {
|
||||
var machine compute.VirtualMachine
|
||||
var exists bool
|
||||
var err error
|
||||
az.operationPollRateLimiter.Accept()
|
||||
machine, exists, err = az.getVirtualMachine(name)
|
||||
if err != nil {
|
||||
if az.CloudProviderBackoff {
|
||||
glog.V(2).Infof("InstanceID(%s) backing off", name)
|
||||
machine, exists, err = az.GetVirtualMachineWithRetry(name)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
return *machine.ID, nil
|
||||
return az.vmSet.GetInstanceIDByNodeName(string(name))
|
||||
}
|
||||
|
||||
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
|
||||
// This method will not be called from the node that is requesting this ID. i.e. metadata service
|
||||
// and other local methods cannot be used here
|
||||
func (az *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
|
||||
name, err := splitProviderID(providerID)
|
||||
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -207,46 +151,7 @@ func (az *Cloud) InstanceType(name types.NodeName) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if az.Config.VMType == vmTypeVMSS {
|
||||
machineType, err := az.getVmssInstanceType(name)
|
||||
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
return az.getStandardInstanceType(name)
|
||||
}
|
||||
|
||||
return machineType, err
|
||||
}
|
||||
|
||||
return az.getStandardInstanceType(name)
|
||||
}
|
||||
|
||||
// getVmssInstanceType gets instance with type vmss.
|
||||
func (az *Cloud) getVmssInstanceType(name types.NodeName) (string, error) {
|
||||
machine, exists, err := az.getVmssVirtualMachine(name)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.InstanceType(%s), az.getVmssVirtualMachine(%s) err=%v", name, name, err)
|
||||
return "", err
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
if machine.Sku.Name != nil {
|
||||
return *machine.Sku.Name, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("instance type is not set")
|
||||
}
|
||||
|
||||
// getStandardInstanceType gets instance with standard type.
|
||||
func (az *Cloud) getStandardInstanceType(name types.NodeName) (string, error) {
|
||||
machine, exists, err := az.getVirtualMachine(name)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.InstanceType(%s), az.getVirtualMachine(%s) err=%v", name, name, err)
|
||||
return "", err
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
return string(machine.HardwareProfile.VMSize), nil
|
||||
return az.vmSet.GetInstanceTypeByNodeName(string(name))
|
||||
}
|
||||
|
||||
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
|
||||
@ -255,8 +160,8 @@ func (az *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
|
||||
return fmt.Errorf("not supported")
|
||||
}
|
||||
|
||||
// CurrentNodeName returns the name of the node we are currently running on
|
||||
// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname
|
||||
// CurrentNodeName returns the name of the node we are currently running on.
|
||||
// On Azure this is the hostname, so we just return the hostname.
|
||||
func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
|
||||
return types.NodeName(hostname), nil
|
||||
}
|
||||
|
@ -23,49 +23,48 @@ import (
|
||||
"strings"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
serviceapi "k8s.io/kubernetes/pkg/api/v1/service"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest/to"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
|
||||
const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal"
|
||||
const (
|
||||
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
|
||||
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal"
|
||||
|
||||
// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service
|
||||
// to specify what subnet it is exposed on
|
||||
const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
|
||||
// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service
|
||||
// to specify what subnet it is exposed on
|
||||
ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
|
||||
|
||||
// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the
|
||||
// Azure load balancer selection based on availability sets
|
||||
// There are currently three possible load balancer selection modes :
|
||||
// 1. Default mode - service has no annotation ("service.beta.kubernetes.io/azure-load-balancer-mode")
|
||||
// In this case the Loadbalancer of the primary Availability set is selected
|
||||
// 2. "__auto__" mode - service is annotated with __auto__ value, this when loadbalancer from any availability set
|
||||
// is selected which has the miinimum rules associated with it.
|
||||
// 3. "as1,as2" mode - this is when the laod balancer from the specified availability sets is selected that has the
|
||||
// miinimum rules associated with it.
|
||||
const ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
|
||||
// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the
|
||||
// Azure load balancer selection based on availability sets
|
||||
// There are currently three possible load balancer selection modes :
|
||||
// 1. Default mode - service has no annotation ("service.beta.kubernetes.io/azure-load-balancer-mode")
|
||||
// In this case the Loadbalancer of the primary Availability set is selected
|
||||
// 2. "__auto__" mode - service is annotated with __auto__ value, this when loadbalancer from any availability set
|
||||
// is selected which has the miinimum rules associated with it.
|
||||
// 3. "as1,as2" mode - this is when the laod balancer from the specified availability sets is selected that has the
|
||||
// miinimum rules associated with it.
|
||||
ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
|
||||
|
||||
// ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the
|
||||
// Azure load balancer auto selection from the availability sets
|
||||
const ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
|
||||
// ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the
|
||||
// Azure load balancer auto selection from the availability sets
|
||||
ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
|
||||
|
||||
// ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service.
|
||||
const ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
|
||||
// ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service.
|
||||
ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
|
||||
|
||||
// ServiceAnnotationSharedSecurityRule is the annotation used on the service
|
||||
// to specify that the service should be exposed using an Azure security rule
|
||||
// that may be shared with other service, trading specificity of rules for an
|
||||
// increase in the number of services that can be exposed. This relies on the
|
||||
// Azure "augmented security rules" feature which at the time of writing is in
|
||||
// preview and available only in certain regions.
|
||||
const ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule"
|
||||
// ServiceAnnotationSharedSecurityRule is the annotation used on the service
|
||||
// to specify that the service should be exposed using an Azure security rule
|
||||
// that may be shared with other service, trading specificity of rules for an
|
||||
// increase in the number of services that can be exposed. This relies on the
|
||||
// Azure "augmented security rules" feature which at the time of writing is in
|
||||
// preview and available only in certain regions.
|
||||
ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule"
|
||||
)
|
||||
|
||||
// GetLoadBalancer returns whether the specified load balancer exists, and
|
||||
// if so, what its status is.
|
||||
@ -166,15 +165,16 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
|
||||
return nil
|
||||
}
|
||||
|
||||
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists
|
||||
// If wantLb is TRUE then -it selects a new load balancer
|
||||
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists.
|
||||
// If wantLb is TRUE then -it selects a new load balancer.
|
||||
// In case the selected load balancer does not exists it returns network.LoadBalancer struct
|
||||
// with added metadata (such as name, location) and existsLB set to FALSE
|
||||
// By default - cluster default LB is returned
|
||||
// with added metadata (such as name, location) and existsLB set to FALSE.
|
||||
// By default - cluster default LB is returned.
|
||||
func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
var defaultLB *network.LoadBalancer
|
||||
defaultLBName := az.getLoadBalancerName(clusterName, az.Config.PrimaryAvailabilitySetName, isInternal)
|
||||
primaryVMSetName := az.vmSet.GetPrimaryVMSetName()
|
||||
defaultLBName := az.getLoadBalancerName(clusterName, primaryVMSetName, isInternal)
|
||||
|
||||
existingLBs, err := az.ListLBWithRetry()
|
||||
if err != nil {
|
||||
@ -234,18 +234,19 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
serviceName := getServiceName(service)
|
||||
glog.V(3).Infof("selectLoadBalancer(%s): isInternal(%s) - start", serviceName, isInternal)
|
||||
availabilitySetNames, err := az.getLoadBalancerAvailabilitySetNames(service, nodes)
|
||||
vmSetNames, err := az.vmSet.GetVMSetNames(service, nodes)
|
||||
if err != nil {
|
||||
glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.getLoadBalancerAvailabilitySetNames failed, err=(%v)", clusterName, serviceName, isInternal, err)
|
||||
glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.GetVMSetNames failed, err=(%v)", clusterName, serviceName, isInternal, err)
|
||||
return nil, false, err
|
||||
}
|
||||
glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - availabilitysetsnames %v", clusterName, serviceName, isInternal, *availabilitySetNames)
|
||||
glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - vmSetNames %v", clusterName, serviceName, isInternal, *vmSetNames)
|
||||
|
||||
mapExistingLBs := map[string]network.LoadBalancer{}
|
||||
for _, lb := range *existingLBs {
|
||||
mapExistingLBs[*lb.Name] = lb
|
||||
}
|
||||
selectedLBRuleCount := math.MaxInt32
|
||||
for _, currASName := range *availabilitySetNames {
|
||||
for _, currASName := range *vmSetNames {
|
||||
currLBName := az.getLoadBalancerName(clusterName, currASName, isInternal)
|
||||
lb, exists := mapExistingLBs[currLBName]
|
||||
if !exists {
|
||||
@ -272,13 +273,13 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi
|
||||
}
|
||||
|
||||
if selectedLB == nil {
|
||||
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected availability sets %v", clusterName, serviceName, isInternal, *availabilitySetNames)
|
||||
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected VM sets %v", clusterName, serviceName, isInternal, *vmSetNames)
|
||||
glog.Error(err)
|
||||
return nil, false, err
|
||||
}
|
||||
// validate if the selected LB has not exceeded the MaximumLoadBalancerRuleCount
|
||||
if az.Config.MaximumLoadBalancerRuleCount != 0 && selectedLBRuleCount >= az.Config.MaximumLoadBalancerRuleCount {
|
||||
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, availabilitysetnames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *availabilitySetNames)
|
||||
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, vmSetNames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *vmSetNames)
|
||||
glog.Error(err)
|
||||
return selectedLB, existsLb, err
|
||||
}
|
||||
@ -741,6 +742,11 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
|
||||
// because an Azure load balancer cannot have an empty FrontendIPConfigurations collection
|
||||
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
|
||||
|
||||
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
|
||||
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
|
||||
az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName)
|
||||
|
||||
// Remove the LB.
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName)
|
||||
err := az.DeleteLBWithRetry(lbName)
|
||||
@ -761,23 +767,10 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
|
||||
|
||||
if wantLb && nodes != nil {
|
||||
// Add the machines to the backend pool if they're not already
|
||||
availabilitySetName := az.mapLoadBalancerNameToAvailabilitySet(lbName, clusterName)
|
||||
hostUpdates := make([]func() error, len(nodes))
|
||||
for i, node := range nodes {
|
||||
localNodeName := node.Name
|
||||
f := func() error {
|
||||
err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID, availabilitySetName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
hostUpdates[i] = f
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
if errs != nil {
|
||||
return nil, utilerrors.Flatten(errs)
|
||||
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
|
||||
err := az.vmSet.EnsureHostsInPool(serviceName, nodes, lbBackendPoolID, vmSetName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1246,95 +1239,6 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b
|
||||
return false
|
||||
}
|
||||
|
||||
// This ensures the given VM's Primary NIC's Primary IP Configuration is
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, availabilitySetName string) error {
|
||||
var machine compute.VirtualMachine
|
||||
vmName := mapNodeNameToVMName(nodeName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", vmName)
|
||||
machine, err := az.VirtualMachineClientGetWithRetry(az.ResourceGroup, vmName, "")
|
||||
if err != nil {
|
||||
glog.V(2).Infof("ensureHostInPool(%s, %s, %s) abort backoff", serviceName, nodeName, backendPoolID)
|
||||
return err
|
||||
}
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", vmName)
|
||||
|
||||
primaryNicID, err := getPrimaryInterfaceID(machine)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nicName, err := getLastSegment(primaryNicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check availability set
|
||||
if availabilitySetName != "" {
|
||||
expectedAvailabilitySetName := az.getAvailabilitySetID(availabilitySetName)
|
||||
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
|
||||
glog.V(3).Infof(
|
||||
"nicupdate(%s): skipping nic (%s) since it is not in the availabilitySet(%s)",
|
||||
serviceName, nicName, availabilitySetName)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
|
||||
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var primaryIPConfig *network.InterfaceIPConfiguration
|
||||
primaryIPConfig, err = getPrimaryIPConfig(nic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
foundPool := false
|
||||
newBackendPools := []network.BackendAddressPool{}
|
||||
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
|
||||
newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools
|
||||
}
|
||||
for _, existingPool := range newBackendPools {
|
||||
if strings.EqualFold(backendPoolID, *existingPool.ID) {
|
||||
foundPool = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundPool {
|
||||
newBackendPools = append(newBackendPools,
|
||||
network.BackendAddressPool{
|
||||
ID: to.StringPtr(backendPoolID),
|
||||
})
|
||||
|
||||
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
|
||||
|
||||
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name)
|
||||
respChan, errChan := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name)
|
||||
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
|
||||
retryErr := az.CreateOrUpdateInterfaceWithRetry(nic)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if service requires an internal load balancer.
|
||||
func requiresInternalLoadBalancer(service *v1.Service) bool {
|
||||
if l, ok := service.Annotations[ServiceAnnotationLoadBalancerInternal]; ok {
|
||||
@ -1354,28 +1258,28 @@ func subnet(service *v1.Service) *string {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getServiceLoadBalancerMode parses the mode value
|
||||
// if the value is __auto__ it returns isAuto = TRUE
|
||||
// if anything else it returns the unique availability set names after triming spaces
|
||||
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, availabilitySetNames []string) {
|
||||
// getServiceLoadBalancerMode parses the mode value.
|
||||
// if the value is __auto__ it returns isAuto = TRUE.
|
||||
// if anything else it returns the unique VM set names after triming spaces.
|
||||
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, vmSetNames []string) {
|
||||
mode, hasMode := service.Annotations[ServiceAnnotationLoadBalancerMode]
|
||||
mode = strings.TrimSpace(mode)
|
||||
isAuto = strings.EqualFold(mode, ServiceAnnotationLoadBalancerAutoModeValue)
|
||||
if !isAuto {
|
||||
// Break up list of "AS1,AS2"
|
||||
availabilitySetParsedList := strings.Split(mode, ",")
|
||||
vmSetParsedList := strings.Split(mode, ",")
|
||||
|
||||
// Trim the availability set names and remove duplicates
|
||||
// Trim the VM set names and remove duplicates
|
||||
// e.g. {"AS1"," AS2", "AS3", "AS3"} => {"AS1", "AS2", "AS3"}
|
||||
availabilitySetNameSet := sets.NewString()
|
||||
for _, v := range availabilitySetParsedList {
|
||||
availabilitySetNameSet.Insert(strings.TrimSpace(v))
|
||||
vmSetNameSet := sets.NewString()
|
||||
for _, v := range vmSetParsedList {
|
||||
vmSetNameSet.Insert(strings.TrimSpace(v))
|
||||
}
|
||||
|
||||
availabilitySetNames = availabilitySetNameSet.List()
|
||||
vmSetNames = vmSetNameSet.List()
|
||||
}
|
||||
|
||||
return hasMode, isAuto, availabilitySetNames
|
||||
return hasMode, isAuto, vmSetNames
|
||||
}
|
||||
|
||||
func useSharedSecurityRule(service *v1.Service) bool {
|
||||
|
@ -870,6 +870,7 @@ func getTestCloud() (az *Cloud) {
|
||||
az.SecurityGroupsClient = newFakeAzureNSGClient()
|
||||
az.VirtualMachinesClient = newFakeAzureVirtualMachinesClient()
|
||||
az.InterfacesClient = newFakeAzureInterfacesClient()
|
||||
az.vmSet = newAvailabilitySet(az)
|
||||
|
||||
return az
|
||||
}
|
||||
@ -1631,7 +1632,8 @@ func TestDecodeInstanceInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitProviderID(t *testing.T) {
|
||||
func TestGetNodeNameByProviderID(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
providers := []struct {
|
||||
providerID string
|
||||
name types.NodeName
|
||||
@ -1666,7 +1668,7 @@ func TestSplitProviderID(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range providers {
|
||||
name, err := splitProviderID(test.providerID)
|
||||
name, err := az.vmSet.GetNodeNameByProviderID(test.providerID)
|
||||
if (err != nil) != test.fail {
|
||||
t.Errorf("Expected to failt=%t, with pattern %v", test.fail, test)
|
||||
}
|
||||
|
@ -30,8 +30,10 @@ import (
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest/to"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
@ -54,6 +56,7 @@ const (
|
||||
nodeLabelRole = "kubernetes.io/role"
|
||||
)
|
||||
|
||||
var errNotInVMSet = errors.New("vm is not in the vmset")
|
||||
var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`)
|
||||
|
||||
// returns the full identifier of a machine
|
||||
@ -133,115 +136,22 @@ func (az *Cloud) getpublicIPAddressID(pipName string) string {
|
||||
pipName)
|
||||
}
|
||||
|
||||
// getLoadBalancerAvailabilitySetNames selects all possible availability sets for
|
||||
// service load balancer, if the service has no loadbalancer mode annotaion returns the
|
||||
// primary availability set if service annotation for loadbalancer availability set
|
||||
// exists then return the eligible a availability set
|
||||
func (az *Cloud) getLoadBalancerAvailabilitySetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) {
|
||||
hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service)
|
||||
if !hasMode {
|
||||
// no mode specified in service annotation default to PrimaryAvailabilitySetName
|
||||
availabilitySetNames = &[]string{az.Config.PrimaryAvailabilitySetName}
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
availabilitySetNames, err = az.getAgentPoolAvailabiliySets(nodes)
|
||||
if err != nil {
|
||||
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - getAgentPoolAvailabiliySets failed err=(%v)", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(*availabilitySetNames) == 0 {
|
||||
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes))
|
||||
return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes))
|
||||
}
|
||||
// sort the list to have deterministic selection
|
||||
sort.Strings(*availabilitySetNames)
|
||||
if !isAuto {
|
||||
if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 {
|
||||
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
|
||||
}
|
||||
// validate availability set exists
|
||||
var found bool
|
||||
for sasx := range serviceAvailabilitySetNames {
|
||||
for asx := range *availabilitySetNames {
|
||||
if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) {
|
||||
found = true
|
||||
serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx]
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx])
|
||||
return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx])
|
||||
}
|
||||
}
|
||||
availabilitySetNames = &serviceAvailabilitySetNames
|
||||
}
|
||||
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
|
||||
// lists the virtual machines for for the resource group and then builds
|
||||
// a list of availability sets that match the nodes available to k8s
|
||||
func (az *Cloud) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
|
||||
vms, err := az.VirtualMachineClientListWithRetry()
|
||||
if err != nil {
|
||||
glog.Errorf("az.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
|
||||
return nil, err
|
||||
}
|
||||
vmNameToAvailabilitySetID := make(map[string]string, len(vms))
|
||||
for vmx := range vms {
|
||||
vm := vms[vmx]
|
||||
if vm.AvailabilitySet != nil {
|
||||
vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID
|
||||
}
|
||||
}
|
||||
availabilitySetIDs := sets.NewString()
|
||||
agentPoolAvailabilitySets = &[]string{}
|
||||
for nx := range nodes {
|
||||
nodeName := (*nodes[nx]).Name
|
||||
if isMasterNode(nodes[nx]) {
|
||||
continue
|
||||
}
|
||||
asID, ok := vmNameToAvailabilitySetID[nodeName]
|
||||
if !ok {
|
||||
glog.Errorf("az.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName)
|
||||
return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName)
|
||||
}
|
||||
if availabilitySetIDs.Has(asID) {
|
||||
// already added in the list
|
||||
continue
|
||||
}
|
||||
asName, err := getLastSegment(asID)
|
||||
if err != nil {
|
||||
glog.Errorf("az.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err)
|
||||
return nil, err
|
||||
}
|
||||
// AvailabilitySet ID is currently upper cased in a indeterministic way
|
||||
// We want to keep it lower case, before the ID get fixed
|
||||
asName = strings.ToLower(asName)
|
||||
|
||||
*agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName)
|
||||
}
|
||||
|
||||
return agentPoolAvailabilitySets, nil
|
||||
}
|
||||
|
||||
func (az *Cloud) mapLoadBalancerNameToAvailabilitySet(lbName string, clusterName string) (availabilitySetName string) {
|
||||
availabilitySetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix)
|
||||
func (az *Cloud) mapLoadBalancerNameToVMSet(lbName string, clusterName string) (vmSetName string) {
|
||||
vmSetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix)
|
||||
if strings.EqualFold(clusterName, lbName) {
|
||||
availabilitySetName = az.Config.PrimaryAvailabilitySetName
|
||||
vmSetName = az.vmSet.GetPrimaryVMSetName()
|
||||
}
|
||||
|
||||
return availabilitySetName
|
||||
return vmSetName
|
||||
}
|
||||
|
||||
// For a load balancer, all frontend ip should reference either a subnet or publicIpAddress.
|
||||
// Thus Azure do not allow mixed type (public and internal) load balancer.
|
||||
// So we'd have a separate name for internal load balancer.
|
||||
// This would be the name for Azure LoadBalancer resource.
|
||||
func (az *Cloud) getLoadBalancerName(clusterName string, availabilitySetName string, isInternal bool) string {
|
||||
lbNamePrefix := availabilitySetName
|
||||
if strings.EqualFold(availabilitySetName, az.Config.PrimaryAvailabilitySetName) {
|
||||
func (az *Cloud) getLoadBalancerName(clusterName string, vmSetName string, isInternal bool) string {
|
||||
lbNamePrefix := vmSetName
|
||||
if strings.EqualFold(vmSetName, az.vmSet.GetPrimaryVMSetName()) {
|
||||
lbNamePrefix = clusterName
|
||||
}
|
||||
if isInternal {
|
||||
@ -402,67 +312,7 @@ outer:
|
||||
}
|
||||
|
||||
func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) {
|
||||
if az.Config.VMType == vmTypeVMSS {
|
||||
ip, err := az.getIPForVmssMachine(nodeName)
|
||||
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
return az.getIPForStandardMachine(nodeName)
|
||||
}
|
||||
|
||||
return ip, err
|
||||
}
|
||||
|
||||
return az.getIPForStandardMachine(nodeName)
|
||||
}
|
||||
|
||||
func (az *Cloud) getIPForStandardMachine(nodeName types.NodeName) (string, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
machine, exists, err := az.getVirtualMachine(nodeName)
|
||||
if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
nicID, err := getPrimaryInterfaceID(machine)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
nicName, err := getLastSegment(nicID)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
|
||||
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
ipConfig, err := getPrimaryIPConfig(nic)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
targetIP := *ipConfig.PrivateIPAddress
|
||||
return targetIP, nil
|
||||
}
|
||||
|
||||
// splitProviderID converts a providerID to a NodeName.
|
||||
func splitProviderID(providerID string) (types.NodeName, error) {
|
||||
matches := providerIDRE.FindStringSubmatch(providerID)
|
||||
if len(matches) != 2 {
|
||||
return "", errors.New("error splitting providerID")
|
||||
}
|
||||
return types.NodeName(matches[1]), nil
|
||||
return az.vmSet.GetIPByNodeName(string(nodeName), "")
|
||||
}
|
||||
|
||||
var polyTable = crc32.MakeTable(crc32.Koopman)
|
||||
@ -519,3 +369,333 @@ func ExtractDiskData(diskData interface{}) (provisioningState string, diskState
|
||||
}
|
||||
return provisioningState, diskState, nil
|
||||
}
|
||||
|
||||
// availabilitySet implements VMSet interface for Azure availability sets.
|
||||
type availabilitySet struct {
|
||||
*Cloud
|
||||
}
|
||||
|
||||
// newStandardSet creates a new availabilitySet.
|
||||
func newAvailabilitySet(az *Cloud) VMSet {
|
||||
return &availabilitySet{
|
||||
Cloud: az,
|
||||
}
|
||||
}
|
||||
|
||||
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
|
||||
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
|
||||
// not exist or is no longer running.
|
||||
func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) {
|
||||
var machine compute.VirtualMachine
|
||||
var exists bool
|
||||
var err error
|
||||
|
||||
as.operationPollRateLimiter.Accept()
|
||||
machine, exists, err = as.getVirtualMachine(types.NodeName(name))
|
||||
if err != nil {
|
||||
if as.CloudProviderBackoff {
|
||||
glog.V(2).Infof("InstanceID(%s) backing off", name)
|
||||
machine, exists, err = as.GetVirtualMachineWithRetry(types.NodeName(name))
|
||||
if err != nil {
|
||||
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
return *machine.ID, nil
|
||||
}
|
||||
|
||||
// GetNodeNameByProviderID gets the node name by provider ID.
|
||||
func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
|
||||
// NodeName is part of providerID for standard instances.
|
||||
matches := providerIDRE.FindStringSubmatch(providerID)
|
||||
if len(matches) != 2 {
|
||||
return "", errors.New("error splitting providerID")
|
||||
}
|
||||
|
||||
return types.NodeName(matches[1]), nil
|
||||
}
|
||||
|
||||
// GetInstanceTypeByNodeName gets the instance type by node name.
|
||||
func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) {
|
||||
machine, exists, err := as.getVirtualMachine(types.NodeName(name))
|
||||
if err != nil {
|
||||
glog.Errorf("error: as.GetInstanceTypeByNodeName(%s), as.getVirtualMachine(%s) err=%v", name, name, err)
|
||||
return "", err
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
return string(machine.HardwareProfile.VMSize), nil
|
||||
}
|
||||
|
||||
// GetZoneByNodeName gets zone from instance view.
|
||||
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||
vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView)
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
|
||||
zone := cloudprovider.Zone{
|
||||
FailureDomain: failureDomain,
|
||||
Region: *(vm.Location),
|
||||
}
|
||||
return zone, nil
|
||||
}
|
||||
|
||||
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
|
||||
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
|
||||
func (as *availabilitySet) GetPrimaryVMSetName() string {
|
||||
return as.Config.PrimaryAvailabilitySetName
|
||||
}
|
||||
|
||||
// GetIPByNodeName gets machine IP by node name.
|
||||
func (as *availabilitySet) GetIPByNodeName(name, vmSetName string) (string, error) {
|
||||
nic, err := as.GetPrimaryInterface(name, vmSetName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
ipConfig, err := getPrimaryIPConfig(nic)
|
||||
if err != nil {
|
||||
glog.Errorf("error: as.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", name, nic, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
targetIP := *ipConfig.PrivateIPAddress
|
||||
return targetIP, nil
|
||||
}
|
||||
|
||||
// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds
|
||||
// a list of availability sets that match the nodes available to k8s.
|
||||
func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
|
||||
vms, err := as.VirtualMachineClientListWithRetry()
|
||||
if err != nil {
|
||||
glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
|
||||
return nil, err
|
||||
}
|
||||
vmNameToAvailabilitySetID := make(map[string]string, len(vms))
|
||||
for vmx := range vms {
|
||||
vm := vms[vmx]
|
||||
if vm.AvailabilitySet != nil {
|
||||
vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID
|
||||
}
|
||||
}
|
||||
availabilitySetIDs := sets.NewString()
|
||||
agentPoolAvailabilitySets = &[]string{}
|
||||
for nx := range nodes {
|
||||
nodeName := (*nodes[nx]).Name
|
||||
if isMasterNode(nodes[nx]) {
|
||||
continue
|
||||
}
|
||||
asID, ok := vmNameToAvailabilitySetID[nodeName]
|
||||
if !ok {
|
||||
glog.Errorf("as.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName)
|
||||
return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName)
|
||||
}
|
||||
if availabilitySetIDs.Has(asID) {
|
||||
// already added in the list
|
||||
continue
|
||||
}
|
||||
asName, err := getLastSegment(asID)
|
||||
if err != nil {
|
||||
glog.Errorf("as.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err)
|
||||
return nil, err
|
||||
}
|
||||
// AvailabilitySet ID is currently upper cased in a indeterministic way
|
||||
// We want to keep it lower case, before the ID get fixed
|
||||
asName = strings.ToLower(asName)
|
||||
|
||||
*agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName)
|
||||
}
|
||||
|
||||
return agentPoolAvailabilitySets, nil
|
||||
}
|
||||
|
||||
// GetVMSetNames selects all possible availability sets or scale sets
|
||||
// (depending vmType configured) for service load balancer, if the service has
|
||||
// no loadbalancer mode annotaion returns the primary VMSet. If service annotation
|
||||
// for loadbalancer exists then return the eligible VMSet.
|
||||
func (as *availabilitySet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) {
|
||||
hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service)
|
||||
if !hasMode {
|
||||
// no mode specified in service annotation default to PrimaryAvailabilitySetName
|
||||
availabilitySetNames = &[]string{as.Config.PrimaryAvailabilitySetName}
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
availabilitySetNames, err = as.getAgentPoolAvailabiliySets(nodes)
|
||||
if err != nil {
|
||||
glog.Errorf("as.GetVMSetNames - getAgentPoolAvailabiliySets failed err=(%v)", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(*availabilitySetNames) == 0 {
|
||||
glog.Errorf("as.GetVMSetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes))
|
||||
return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes))
|
||||
}
|
||||
// sort the list to have deterministic selection
|
||||
sort.Strings(*availabilitySetNames)
|
||||
if !isAuto {
|
||||
if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 {
|
||||
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
|
||||
}
|
||||
// validate availability set exists
|
||||
var found bool
|
||||
for sasx := range serviceAvailabilitySetNames {
|
||||
for asx := range *availabilitySetNames {
|
||||
if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) {
|
||||
found = true
|
||||
serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx]
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
glog.Errorf("as.GetVMSetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx])
|
||||
return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx])
|
||||
}
|
||||
}
|
||||
availabilitySetNames = &serviceAvailabilitySetNames
|
||||
}
|
||||
|
||||
return availabilitySetNames, nil
|
||||
}
|
||||
|
||||
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
||||
func (as *availabilitySet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
|
||||
var machine compute.VirtualMachine
|
||||
|
||||
as.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", nodeName)
|
||||
machine, err := as.VirtualMachineClientGetWithRetry(as.ResourceGroup, nodeName, "")
|
||||
if err != nil {
|
||||
glog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName)
|
||||
return network.Interface{}, err
|
||||
}
|
||||
glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", nodeName)
|
||||
|
||||
primaryNicID, err := getPrimaryInterfaceID(machine)
|
||||
if err != nil {
|
||||
return network.Interface{}, err
|
||||
}
|
||||
nicName, err := getLastSegment(primaryNicID)
|
||||
if err != nil {
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
// Check availability set
|
||||
if vmSetName != "" {
|
||||
expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName)
|
||||
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
|
||||
glog.V(3).Infof(
|
||||
"GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName)
|
||||
return network.Interface{}, errNotInVMSet
|
||||
}
|
||||
}
|
||||
|
||||
as.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
|
||||
nic, err := as.InterfacesClient.Get(as.ResourceGroup, nicName, "")
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
|
||||
if err != nil {
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
return nic, nil
|
||||
}
|
||||
|
||||
// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, vmSetName string) error {
|
||||
vmName := mapNodeNameToVMName(nodeName)
|
||||
nic, err := as.GetPrimaryInterface(vmName, vmSetName)
|
||||
if err != nil {
|
||||
if err == errNotInVMSet {
|
||||
glog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
var primaryIPConfig *network.InterfaceIPConfiguration
|
||||
primaryIPConfig, err = getPrimaryIPConfig(nic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
foundPool := false
|
||||
newBackendPools := []network.BackendAddressPool{}
|
||||
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
|
||||
newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools
|
||||
}
|
||||
for _, existingPool := range newBackendPools {
|
||||
if strings.EqualFold(backendPoolID, *existingPool.ID) {
|
||||
foundPool = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundPool {
|
||||
newBackendPools = append(newBackendPools,
|
||||
network.BackendAddressPool{
|
||||
ID: to.StringPtr(backendPoolID),
|
||||
})
|
||||
|
||||
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
|
||||
|
||||
nicName := *nic.Name
|
||||
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
|
||||
as.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name)
|
||||
respChan, errChan := as.InterfacesClient.CreateOrUpdate(as.ResourceGroup, *nic.Name, nic, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name)
|
||||
if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
|
||||
retryErr := as.CreateOrUpdateInterfaceWithRetry(nic)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureHostsInPool ensures the given Node's primary IP configurations are
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error {
|
||||
hostUpdates := make([]func() error, len(nodes))
|
||||
for i, node := range nodes {
|
||||
localNodeName := node.Name
|
||||
f := func() error {
|
||||
err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", serviceName, backendPoolID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
hostUpdates[i] = f
|
||||
}
|
||||
|
||||
errs := utilerrors.AggregateGoroutines(hostUpdates...)
|
||||
if errs != nil {
|
||||
return utilerrors.Flatten(errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
|
||||
func (as *availabilitySet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
|
||||
// Do nothing for availability set.
|
||||
return nil
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetVmssInstanceID(t *testing.T) {
|
||||
func TestGetScaleSetVMInstanceID(t *testing.T) {
|
||||
tests := []struct {
|
||||
msg string
|
||||
machineName string
|
||||
@ -43,7 +43,7 @@ func TestGetVmssInstanceID(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
instanceID, err := getVmssInstanceID(test.machineName)
|
||||
instanceID, err := getScaleSetVMInstanceID(test.machineName)
|
||||
if test.expectError {
|
||||
assert.Error(t, err, fmt.Sprintf("TestCase[%d]: %s", i, test.msg))
|
||||
} else {
|
||||
|
@ -17,51 +17,222 @@ limitations under the License.
|
||||
package azure
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest/to"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
func (az *Cloud) getIPForVmssMachine(nodeName types.NodeName) (string, error) {
|
||||
az.operationPollRateLimiter.Accept()
|
||||
machine, exists, err := az.getVmssVirtualMachine(nodeName)
|
||||
var (
|
||||
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
|
||||
ErrorNotVmssInstance = errors.New("not a vmss instance")
|
||||
|
||||
scaleSetNameRE = regexp.MustCompile(`^/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
|
||||
)
|
||||
|
||||
// scaleSet implements VMSet interface for Azure scale set.
|
||||
type scaleSet struct {
|
||||
*Cloud
|
||||
|
||||
// availabilitySet is also required for scaleSet because some instances
|
||||
// (e.g. master nodes) may not belong to any scale sets.
|
||||
availabilitySet VMSet
|
||||
}
|
||||
|
||||
// newScaleSet creates a new scaleSet.
|
||||
func newScaleSet(az *Cloud) VMSet {
|
||||
return &scaleSet{
|
||||
Cloud: az,
|
||||
availabilitySet: newAvailabilitySet(az),
|
||||
}
|
||||
}
|
||||
|
||||
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
|
||||
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
|
||||
// not exist or is no longer running.
|
||||
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
|
||||
instanceID, err := ss.getScaleSetInstanceIDByName(name, ss.PrimaryScaleSetName)
|
||||
if err != nil {
|
||||
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
// TODO: find a better way to identify the type of VM.
|
||||
return ss.availabilitySet.GetInstanceIDByNodeName(name)
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
return instanceID, nil
|
||||
}
|
||||
|
||||
func (ss *scaleSet) getScaleSetInstanceIDByName(name, scaleSetName string) (string, error) {
|
||||
var machine compute.VirtualMachineScaleSetVM
|
||||
var exists bool
|
||||
var err error
|
||||
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
machine, exists, err = ss.getScaleSetVM(name, scaleSetName)
|
||||
if err != nil {
|
||||
if ss.CloudProviderBackoff {
|
||||
glog.V(2).Infof("InstanceID(%s) backing off", name)
|
||||
machine, exists, err = ss.GetScaleSetsVMWithRetry(types.NodeName(name), scaleSetName)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
return *machine.ID, nil
|
||||
}
|
||||
|
||||
func (ss *scaleSet) getScaleSetVM(nodeName, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) {
|
||||
instanceID, err := getScaleSetVMInstanceID(nodeName)
|
||||
if err != nil {
|
||||
return vm, false, err
|
||||
}
|
||||
|
||||
return ss.getScaleSetVMByID(instanceID, scaleSetName)
|
||||
}
|
||||
|
||||
func (ss *scaleSet) getScaleSetVMByID(instanceID, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
// scaleSetName is required to query VM info.
|
||||
if scaleSetName == "" {
|
||||
scaleSetName = ss.PrimaryScaleSetName
|
||||
}
|
||||
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", instanceID)
|
||||
vm, err = ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, scaleSetName, instanceID)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", instanceID)
|
||||
|
||||
exists, realErr = checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return vm, false, realErr
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return vm, false, nil
|
||||
}
|
||||
|
||||
return vm, exists, err
|
||||
}
|
||||
|
||||
// GetNodeNameByProviderID gets the node name by provider ID.
|
||||
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
|
||||
// NodeName is not part of providerID for vmss instances.
|
||||
parts := strings.Split(providerID, "/")
|
||||
instanceID := parts[len(parts)-1]
|
||||
machine, exist, err := ss.getScaleSetVMByID(instanceID, ss.PrimaryScaleSetName)
|
||||
if !exist {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForVmssMachine(%s), az.getVmssVirtualMachine(%s), err=%v", nodeName, nodeName, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
nicID, err := getPrimaryInterfaceIDForVmssMachine(machine)
|
||||
return types.NodeName(*machine.OsProfile.ComputerName), nil
|
||||
}
|
||||
|
||||
// GetInstanceTypeByNodeName gets the instance type by node name.
|
||||
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
|
||||
instanceType, err := ss.getScaleSetInstanceTypeByNodeName(name)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForVmssMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
|
||||
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
// TODO: find a better way to identify the type of VM.
|
||||
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
nicName, err := getLastSegment(nicID)
|
||||
return instanceType, nil
|
||||
}
|
||||
|
||||
func (ss *scaleSet) getScaleSetInstanceTypeByNodeName(name string) (string, error) {
|
||||
machine, exists, err := ss.getScaleSetVM(name, ss.PrimaryScaleSetName)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForVmssMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
|
||||
glog.Errorf("error: ss.getScaleSetInstanceTypeByNodeName(%s), ss.getScaleSetVM(%s) err=%v", name, name, err)
|
||||
return "", err
|
||||
} else if !exists {
|
||||
return "", cloudprovider.InstanceNotFound
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
|
||||
nic, err := az.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(az.ResourceGroup, az.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "")
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
|
||||
if machine.Sku.Name != nil {
|
||||
return *machine.Sku.Name, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("instance type is not defined")
|
||||
}
|
||||
|
||||
// GetZoneByNodeName gets cloudprovider.Zone by node name.
|
||||
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||
instanceID, err := getScaleSetVMInstanceID(name)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForVmssMachine(%s), az.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err)
|
||||
if err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
// TODO: find a better way to identify the type of VM.
|
||||
return ss.availabilitySet.GetZoneByNodeName(name)
|
||||
}
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
vm, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID)
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
// PlatformFaultDomain is not included in VirtualMachineScaleSetVM, so we get it from VirtualMachineScaleSetVMInstanceView.
|
||||
vmView, err := ss.VirtualMachineScaleSetVMsClient.GetInstanceView(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID)
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
failureDomain := strconv.Itoa(int(*vmView.PlatformFaultDomain))
|
||||
zone := cloudprovider.Zone{
|
||||
FailureDomain: failureDomain,
|
||||
Region: *(vm.Location),
|
||||
}
|
||||
return zone, nil
|
||||
}
|
||||
|
||||
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
|
||||
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
|
||||
func (ss *scaleSet) GetPrimaryVMSetName() string {
|
||||
return ss.Config.PrimaryScaleSetName
|
||||
}
|
||||
|
||||
// GetIPByNodeName gets machine IP by node name.
|
||||
func (ss *scaleSet) GetIPByNodeName(nodeName, vmSetName string) (string, error) {
|
||||
nic, err := ss.GetPrimaryInterface(nodeName, vmSetName)
|
||||
if err != nil {
|
||||
glog.Errorf("error: ss.GetIPByNodeName(%s), GetPrimaryInterface(%q, %q), err=%v", nodeName, nodeName, vmSetName, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
ipConfig, err := getPrimaryIPConfig(nic)
|
||||
if err != nil {
|
||||
glog.Errorf("error: az.getIPForVmssMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
|
||||
glog.Errorf("error: ss.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
@ -70,7 +241,7 @@ func (az *Cloud) getIPForVmssMachine(nodeName types.NodeName) (string, error) {
|
||||
}
|
||||
|
||||
// This returns the full identifier of the primary NIC for the given VM.
|
||||
func getPrimaryInterfaceIDForVmssMachine(machine compute.VirtualMachineScaleSetVM) (string, error) {
|
||||
func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSetVM) (string, error) {
|
||||
if len(*machine.NetworkProfile.NetworkInterfaces) == 1 {
|
||||
return *(*machine.NetworkProfile.NetworkInterfaces)[0].ID, nil
|
||||
}
|
||||
@ -87,7 +258,7 @@ func getPrimaryInterfaceIDForVmssMachine(machine compute.VirtualMachineScaleSetV
|
||||
// machineName is composed of computerNamePrefix and 36-based instanceID.
|
||||
// And instanceID part if in fixed length of 6 characters.
|
||||
// Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/.
|
||||
func getVmssInstanceID(machineName string) (string, error) {
|
||||
func getScaleSetVMInstanceID(machineName string) (string, error) {
|
||||
nameLength := len(machineName)
|
||||
if nameLength < 6 {
|
||||
return "", ErrorNotVmssInstance
|
||||
@ -100,3 +271,563 @@ func getVmssInstanceID(machineName string) (string, error) {
|
||||
|
||||
return fmt.Sprintf("%d", instanceID), nil
|
||||
}
|
||||
|
||||
// extractScaleSetNameByVMID extracts the scaleset name by scaleSetVirtualMachine's ID.
|
||||
func extractScaleSetNameByVMID(vmID string) (string, error) {
|
||||
matches := scaleSetNameRE.FindStringSubmatch(vmID)
|
||||
if len(matches) != 2 {
|
||||
return "", ErrorNotVmssInstance
|
||||
}
|
||||
|
||||
return matches[1], nil
|
||||
}
|
||||
|
||||
// listScaleSetsWithRetry lists scale sets with exponential backoff retry.
|
||||
func (ss *scaleSet) listScaleSetsWithRetry() ([]string, error) {
|
||||
var err error
|
||||
var result compute.VirtualMachineScaleSetListResult
|
||||
allScaleSets := make([]string, 0)
|
||||
|
||||
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.List start for %v", ss.ResourceGroup)
|
||||
result, err = ss.VirtualMachineScaleSetsClient.List(ss.ResourceGroup)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.List end for %v", ss.ResourceGroup)
|
||||
if err != nil {
|
||||
glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", ss.ResourceGroup, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if backoffError != nil {
|
||||
return nil, backoffError
|
||||
}
|
||||
|
||||
appendResults := (result.Value != nil && len(*result.Value) > 1)
|
||||
for appendResults {
|
||||
for _, scaleSet := range *result.Value {
|
||||
allScaleSets = append(allScaleSets, *scaleSet.Name)
|
||||
}
|
||||
appendResults = false
|
||||
|
||||
if result.NextLink != nil {
|
||||
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.ListNextResults start for %v", ss.ResourceGroup)
|
||||
result, err = ss.VirtualMachineScaleSetsClient.ListNextResults(result)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.ListNextResults end for %v", ss.ResourceGroup)
|
||||
if err != nil {
|
||||
glog.Errorf("VirtualMachineScaleSetsClient.ListNextResults for %v failed: %v", ss.ResourceGroup, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if backoffError != nil {
|
||||
return nil, backoffError
|
||||
}
|
||||
|
||||
appendResults = (result.Value != nil && len(*result.Value) > 1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return allScaleSets, nil
|
||||
}
|
||||
|
||||
// listScaleSetVMsWithRetry lists VMs belonging to the specified scale set with exponential backoff retry.
|
||||
func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.VirtualMachineScaleSetVM, error) {
|
||||
var err error
|
||||
var result compute.VirtualMachineScaleSetVMListResult
|
||||
allVMs := make([]compute.VirtualMachineScaleSetVM, 0)
|
||||
|
||||
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List start for %v", scaleSetName)
|
||||
result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", "")
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List end for %v", scaleSetName)
|
||||
if err != nil {
|
||||
glog.Errorf("VirtualMachineScaleSetVMsClient.List for %v failed: %v", scaleSetName, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if backoffError != nil {
|
||||
return nil, backoffError
|
||||
}
|
||||
|
||||
appendResults := (result.Value != nil && len(*result.Value) > 1)
|
||||
for appendResults {
|
||||
allVMs = append(allVMs, *result.Value...)
|
||||
appendResults = false
|
||||
|
||||
if result.NextLink != nil {
|
||||
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.ListNextResults start for %v", scaleSetName)
|
||||
result, err = ss.VirtualMachineScaleSetVMsClient.ListNextResults(result)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.ListNextResults end for %v", ss.ResourceGroup)
|
||||
if err != nil {
|
||||
glog.Errorf("VirtualMachineScaleSetVMsClient.ListNextResults for %v failed: %v", scaleSetName, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if backoffError != nil {
|
||||
return nil, backoffError
|
||||
}
|
||||
|
||||
appendResults = (result.Value != nil && len(*result.Value) > 1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return allVMs, nil
|
||||
}
|
||||
|
||||
// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds
|
||||
// a list of availability sets that match the nodes available to k8s.
|
||||
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
|
||||
scaleSetNames, err := ss.listScaleSetsWithRetry()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vmNameToScaleSetName := make(map[string]string, len(scaleSetNames))
|
||||
for _, scaleSetName := range scaleSetNames {
|
||||
vms, err := ss.listScaleSetVMsWithRetry(scaleSetName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for idx := range vms {
|
||||
vm := vms[idx]
|
||||
if vm.OsProfile != nil || vm.OsProfile.ComputerName != nil {
|
||||
vmNameToScaleSetName[*vm.OsProfile.ComputerName] = scaleSetName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
agentPoolScaleSets := &[]string{}
|
||||
availableScaleSetNames := sets.NewString()
|
||||
for nx := range nodes {
|
||||
if isMasterNode(nodes[nx]) {
|
||||
continue
|
||||
}
|
||||
|
||||
nodeName := nodes[nx].Name
|
||||
ssName, ok := vmNameToScaleSetName[nodeName]
|
||||
if !ok {
|
||||
// TODO: support master nodes not managed by VMSS.
|
||||
glog.Errorf("Node %q is not belonging to any known scale sets", nodeName)
|
||||
return nil, fmt.Errorf("node %q is not belonging to any known scale sets", nodeName)
|
||||
}
|
||||
|
||||
if availableScaleSetNames.Has(ssName) {
|
||||
continue
|
||||
}
|
||||
|
||||
*agentPoolScaleSets = append(*agentPoolScaleSets, ssName)
|
||||
}
|
||||
|
||||
return agentPoolScaleSets, nil
|
||||
}
|
||||
|
||||
// GetVMSetNames selects all possible availability sets or scale sets
|
||||
// (depending vmType configured) for service load balancer. If the service has
|
||||
// no loadbalancer mode annotaion returns the primary VMSet. If service annotation
|
||||
// for loadbalancer exists then return the eligible VMSet.
|
||||
func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetNames *[]string, err error) {
|
||||
hasMode, isAuto, serviceVMSetNames := getServiceLoadBalancerMode(service)
|
||||
if !hasMode {
|
||||
// no mode specified in service annotation default to PrimaryScaleSetName.
|
||||
scaleSetNames := &[]string{ss.Config.PrimaryScaleSetName}
|
||||
return scaleSetNames, nil
|
||||
}
|
||||
|
||||
scaleSetNames, err := ss.getAgentPoolScaleSets(nodes)
|
||||
if err != nil {
|
||||
glog.Errorf("ss.GetVMSetNames - getAgentPoolScaleSets failed err=(%v)", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(*scaleSetNames) == 0 {
|
||||
glog.Errorf("ss.GetVMSetNames - No scale sets found for nodes in the cluster, node count(%d)", len(nodes))
|
||||
return nil, fmt.Errorf("No scale sets found for nodes, node count(%d)", len(nodes))
|
||||
}
|
||||
|
||||
// sort the list to have deterministic selection
|
||||
sort.Strings(*scaleSetNames)
|
||||
|
||||
if !isAuto {
|
||||
if serviceVMSetNames == nil || len(serviceVMSetNames) == 0 {
|
||||
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
|
||||
}
|
||||
// validate scale set exists
|
||||
var found bool
|
||||
for sasx := range serviceVMSetNames {
|
||||
for asx := range *scaleSetNames {
|
||||
if strings.EqualFold((*scaleSetNames)[asx], serviceVMSetNames[sasx]) {
|
||||
found = true
|
||||
serviceVMSetNames[sasx] = (*scaleSetNames)[asx]
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
glog.Errorf("ss.GetVMSetNames - scale set (%s) in service annotation not found", serviceVMSetNames[sasx])
|
||||
return nil, fmt.Errorf("scale set (%s) - not found", serviceVMSetNames[sasx])
|
||||
}
|
||||
}
|
||||
vmSetNames = &serviceVMSetNames
|
||||
}
|
||||
|
||||
return vmSetNames, nil
|
||||
}
|
||||
|
||||
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
||||
func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
machine, exists, err := ss.getScaleSetVM(nodeName, vmSetName)
|
||||
if !exists || err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
|
||||
// Retry with standard type because master nodes may not belong to any vmss.
|
||||
// TODO: find a better way to identify the type of VM.
|
||||
return ss.availabilitySet.GetPrimaryInterface(nodeName, "")
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getScaleSetVM(%s), err=%v", nodeName, nodeName, err)
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
nicID, err := ss.getPrimaryInterfaceID(machine)
|
||||
if err != nil {
|
||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
nicName, err := getLastSegment(nicID)
|
||||
if err != nil {
|
||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
|
||||
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "")
|
||||
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
|
||||
if err != nil {
|
||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, nicName, "", err)
|
||||
return network.Interface{}, err
|
||||
}
|
||||
|
||||
// Fix interface's location, which is required when updating the interface.
|
||||
// TODO: is this a bug of azure SDK?
|
||||
if nic.Location == nil || *nic.Location == "" {
|
||||
nic.Location = &ss.Config.Location
|
||||
}
|
||||
|
||||
return nic, nil
|
||||
}
|
||||
|
||||
// getScaleSet gets a scale set by name.
|
||||
func (ss *scaleSet) getScaleSet(name string) (compute.VirtualMachineScaleSet, bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.Get(%s): start", name)
|
||||
result, err := ss.VirtualMachineScaleSetsClient.Get(ss.ResourceGroup, name)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.Get(%s): end", name)
|
||||
|
||||
exists, realErr := checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return result, false, realErr
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return result, false, nil
|
||||
}
|
||||
|
||||
return result, exists, err
|
||||
}
|
||||
|
||||
// getScaleSetWithRetry gets scale set with exponential backoff retry
|
||||
func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) {
|
||||
var result compute.VirtualMachineScaleSet
|
||||
var exists bool
|
||||
|
||||
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
var retryErr error
|
||||
result, exists, retryErr = ss.getScaleSet(name)
|
||||
if retryErr != nil {
|
||||
glog.Errorf("backoff: failure, will retry,err=%v", retryErr)
|
||||
return false, nil
|
||||
}
|
||||
glog.V(2).Infof("backoff: success")
|
||||
return true, nil
|
||||
})
|
||||
|
||||
return result, exists, err
|
||||
}
|
||||
|
||||
// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets.
|
||||
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
|
||||
networkConfigurations := *networkConfigurationList
|
||||
if len(networkConfigurations) == 1 {
|
||||
return &networkConfigurations[0], nil
|
||||
}
|
||||
|
||||
for idx := range networkConfigurations {
|
||||
networkConfig := &networkConfigurations[idx]
|
||||
if networkConfig.Primary != nil && *networkConfig.Primary == true {
|
||||
return networkConfig, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName)
|
||||
}
|
||||
|
||||
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
|
||||
ipConfigurations := *config.IPConfigurations
|
||||
if len(ipConfigurations) == 1 {
|
||||
return &ipConfigurations[0], nil
|
||||
}
|
||||
|
||||
for idx := range ipConfigurations {
|
||||
ipConfig := &ipConfigurations[idx]
|
||||
if ipConfig.Primary != nil && *ipConfig.Primary == true {
|
||||
return ipConfig, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName)
|
||||
}
|
||||
|
||||
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
|
||||
func (ss *scaleSet) createOrUpdateVMSSWithRetry(virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
|
||||
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): start", *virtualMachineScaleSet.Name)
|
||||
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
|
||||
return processRetryResponse(resp.Response, err)
|
||||
})
|
||||
}
|
||||
|
||||
// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
|
||||
func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
|
||||
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): start", scaleSetName)
|
||||
respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, scaleSetName, vmInstanceIDs, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
|
||||
return processRetryResponse(resp.Response, err)
|
||||
})
|
||||
}
|
||||
|
||||
// EnsureHostsInPool ensures the given Node's primary IP configurations are
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error {
|
||||
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName)
|
||||
if err != nil {
|
||||
glog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
errorMessage := fmt.Errorf("Scale set %q not found", vmSetName)
|
||||
glog.Errorf("%v", errorMessage)
|
||||
return errorMessage
|
||||
}
|
||||
|
||||
// Find primary network interface configuration.
|
||||
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
|
||||
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find primary IP configuration.
|
||||
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update primary IP configuration's LoadBalancerBackendAddressPools.
|
||||
foundPool := false
|
||||
newBackendPools := []compute.SubResource{}
|
||||
if primaryIPConfiguration.LoadBalancerBackendAddressPools != nil {
|
||||
newBackendPools = *primaryIPConfiguration.LoadBalancerBackendAddressPools
|
||||
}
|
||||
for _, existingPool := range newBackendPools {
|
||||
if strings.EqualFold(backendPoolID, *existingPool.ID) {
|
||||
foundPool = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundPool {
|
||||
newBackendPools = append(newBackendPools,
|
||||
compute.SubResource{
|
||||
ID: to.StringPtr(backendPoolID),
|
||||
})
|
||||
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
|
||||
|
||||
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating", serviceName, vmSetName)
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): start", vmSetName)
|
||||
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil)
|
||||
resp := <-respChan
|
||||
err := <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
|
||||
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
|
||||
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Construct instanceIDs from nodes.
|
||||
instanceIDs := []string{}
|
||||
for _, curNode := range nodes {
|
||||
curScaleSetName, err := extractScaleSetNameByVMID(curNode.Spec.ExternalID)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Node %q is not belonging to any scale sets, omitting it", curNode.Name)
|
||||
continue
|
||||
}
|
||||
if curScaleSetName != vmSetName {
|
||||
glog.V(2).Infof("Node %q is not belonging to scale set %q, omitting it", curNode.Name, vmSetName)
|
||||
continue
|
||||
}
|
||||
|
||||
instanceID, err := getLastSegment(curNode.Spec.ExternalID)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get last segment from %q: %v", curNode.Spec.ExternalID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
instanceIDs = append(instanceIDs, instanceID)
|
||||
}
|
||||
|
||||
// Update instances to latest VMSS model.
|
||||
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
|
||||
InstanceIds: &instanceIDs,
|
||||
}
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): start", vmSetName)
|
||||
respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil)
|
||||
resp := <-respChan
|
||||
err = <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName)
|
||||
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
|
||||
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
|
||||
func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
|
||||
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName)
|
||||
if err != nil {
|
||||
glog.Errorf("ss.EnsureBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, vmSetName, vmSetName, err)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
glog.V(2).Infof("ss.EnsureBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, vmSetName, vmSetName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find primary network interface configuration.
|
||||
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
|
||||
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find primary IP configuration.
|
||||
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
|
||||
if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 {
|
||||
return nil
|
||||
}
|
||||
existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools
|
||||
newBackendPools := []compute.SubResource{}
|
||||
foundPool := false
|
||||
for i := len(existingBackendPools) - 1; i >= 0; i-- {
|
||||
curPool := existingBackendPools[i]
|
||||
if strings.EqualFold(poolID, *curPool.ID) {
|
||||
glog.V(10).Infof("EnsureBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, vmSetName)
|
||||
foundPool = true
|
||||
newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...)
|
||||
}
|
||||
}
|
||||
if !foundPool {
|
||||
// Pool not found, assume it has been already removed.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update scale set with backoff.
|
||||
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
|
||||
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", vmSetName)
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): start", vmSetName)
|
||||
respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil)
|
||||
resp := <-respChan
|
||||
err = <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
|
||||
if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err)
|
||||
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", vmSetName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update instances to latest VMSS model.
|
||||
instanceIDs := []string{"*"}
|
||||
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
|
||||
InstanceIds: &instanceIDs,
|
||||
}
|
||||
ss.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): start", vmSetName)
|
||||
updateRespChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil)
|
||||
updateResp := <-updateRespChan
|
||||
err = <-errChan
|
||||
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName)
|
||||
if ss.CloudProviderBackoff && shouldRetryAPIRequest(updateResp.Response, err) {
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", vmSetName, err)
|
||||
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs)
|
||||
if retryErr != nil {
|
||||
err = retryErr
|
||||
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", vmSetName)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
59
pkg/cloudprovider/providers/azure/azure_vmsets.go
Normal file
59
pkg/cloudprovider/providers/azure/azure_vmsets.go
Normal file
@ -0,0 +1,59 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package azure
|
||||
|
||||
import (
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
)
|
||||
|
||||
// VMSet defines functions all vmsets (including scale set and availabitlity
|
||||
// set) should be implemented.
|
||||
type VMSet interface {
|
||||
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
|
||||
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
|
||||
// not exist or is no longer running.
|
||||
GetInstanceIDByNodeName(name string) (string, error)
|
||||
// GetInstanceTypeByNodeName gets the instance type by node name.
|
||||
GetInstanceTypeByNodeName(name string) (string, error)
|
||||
// GetIPByNodeName gets machine IP by node name.
|
||||
GetIPByNodeName(name, vmSetName string) (string, error)
|
||||
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
||||
GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error)
|
||||
// GetNodeNameByProviderID gets the node name by provider ID.
|
||||
GetNodeNameByProviderID(providerID string) (types.NodeName, error)
|
||||
|
||||
// GetZoneByNodeName gets cloudprovider.Zone by node name.
|
||||
GetZoneByNodeName(name string) (cloudprovider.Zone, error)
|
||||
|
||||
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
|
||||
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
|
||||
GetPrimaryVMSetName() string
|
||||
// GetVMSetNames selects all possible availability sets or scale sets
|
||||
// (depending vmType configured) for service load balancer, if the service has
|
||||
// no loadbalancer mode annotaion returns the primary VMSet. If service annotation
|
||||
// for loadbalancer exists then return the eligible VMSet.
|
||||
GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error)
|
||||
// EnsureHostsInPool ensures the given Node's primary IP configurations are
|
||||
// participating in the specified LoadBalancer Backend Pool.
|
||||
EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error
|
||||
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
|
||||
EnsureBackendPoolDeleted(poolID, vmSetName string) error
|
||||
}
|
@ -17,19 +17,14 @@ limitations under the License.
|
||||
package azure
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
"github.com/Azure/azure-sdk-for-go/arm/network"
|
||||
"github.com/Azure/go-autorest/autorest"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
|
||||
ErrorNotVmssInstance = errors.New("not a vmss instance")
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
// checkExistsFromError inspects an error and returns a true if err is nil,
|
||||
@ -80,32 +75,6 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM
|
||||
return vm, exists, err
|
||||
}
|
||||
|
||||
func (az *Cloud) getVmssVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
vmName := string(nodeName)
|
||||
instanceID, err := getVmssInstanceID(vmName)
|
||||
if err != nil {
|
||||
return vm, false, err
|
||||
}
|
||||
|
||||
az.operationPollRateLimiter.Accept()
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", vmName)
|
||||
vm, err = az.VirtualMachineScaleSetVMsClient.Get(az.ResourceGroup, az.PrimaryScaleSetName, instanceID)
|
||||
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", vmName)
|
||||
|
||||
exists, realErr = checkResourceExistsFromError(err)
|
||||
if realErr != nil {
|
||||
return vm, false, realErr
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return vm, false, nil
|
||||
}
|
||||
|
||||
return vm, exists, err
|
||||
}
|
||||
|
||||
func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) {
|
||||
var realErr error
|
||||
|
||||
|
@ -21,13 +21,10 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/arm/compute"
|
||||
)
|
||||
|
||||
const instanceInfoURL = "http://169.254.169.254/metadata/v1/InstanceInfo"
|
||||
@ -63,10 +60,11 @@ func (az *Cloud) GetZone() (cloudprovider.Zone, error) {
|
||||
// This is particularly useful in external cloud providers where the kubelet
|
||||
// does not initialize node data.
|
||||
func (az *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
|
||||
nodeName, err := splitProviderID(providerID)
|
||||
nodeName, err := az.vmSet.GetNodeNameByProviderID(providerID)
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
return az.GetZoneByNodeName(nodeName)
|
||||
}
|
||||
|
||||
@ -74,20 +72,7 @@ func (az *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, err
|
||||
// This is particularly useful in external cloud providers where the kubelet
|
||||
// does not initialize node data.
|
||||
func (az *Cloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
|
||||
|
||||
vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, string(nodeName), compute.InstanceView)
|
||||
|
||||
if err != nil {
|
||||
return cloudprovider.Zone{}, err
|
||||
}
|
||||
|
||||
failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
|
||||
|
||||
zone := cloudprovider.Zone{
|
||||
FailureDomain: failureDomain,
|
||||
Region: *(vm.Location),
|
||||
}
|
||||
return zone, nil
|
||||
return az.vmSet.GetZoneByNodeName(string(nodeName))
|
||||
}
|
||||
|
||||
func fetchFaultDomain() (*string, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user