diff --git a/pkg/cloudprovider/providers/azure/BUILD b/pkg/cloudprovider/providers/azure/BUILD index a72de9f2a6d..322024f2c61 100644 --- a/pkg/cloudprovider/providers/azure/BUILD +++ b/pkg/cloudprovider/providers/azure/BUILD @@ -24,6 +24,7 @@ go_library( "azure_storageaccount.go", "azure_util.go", "azure_util_vmss.go", + "azure_vmsets.go", "azure_wrap.go", "azure_zones.go", ], diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index d60e11423f8..2d8322ee58b 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -198,6 +198,7 @@ type Cloud struct { operationPollRateLimiter flowcontrol.RateLimiter resourceRequestBackoff wait.Backoff metadata *InstanceMetadata + vmSet VMSet // Clients for vmss. VirtualMachineScaleSetsClient compute.VirtualMachineScaleSetsClient @@ -346,16 +347,16 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.SecurityGroupsClient = securityGroupsClient virtualMachineScaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(az.SubscriptionID) - az.VirtualMachineScaleSetVMsClient.BaseURI = az.Environment.ResourceManagerEndpoint - az.VirtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken) - az.VirtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second + virtualMachineScaleSetVMsClient.BaseURI = az.Environment.ResourceManagerEndpoint + virtualMachineScaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken) + virtualMachineScaleSetVMsClient.PollingDelay = 5 * time.Second configureUserAgent(&virtualMachineScaleSetVMsClient.Client) az.VirtualMachineScaleSetVMsClient = virtualMachineScaleSetVMsClient virtualMachineScaleSetsClient := compute.NewVirtualMachineScaleSetsClient(az.SubscriptionID) - az.VirtualMachineScaleSetsClient.BaseURI = az.Environment.ResourceManagerEndpoint - az.VirtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken) - az.VirtualMachineScaleSetsClient.PollingDelay = 5 * time.Second + virtualMachineScaleSetsClient.BaseURI = az.Environment.ResourceManagerEndpoint + virtualMachineScaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(servicePrincipalToken) + virtualMachineScaleSetsClient.PollingDelay = 5 * time.Second configureUserAgent(&virtualMachineScaleSetsClient.Client) az.VirtualMachineScaleSetsClient = virtualMachineScaleSetsClient @@ -421,6 +422,12 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az.MaximumLoadBalancerRuleCount = maximumLoadBalancerRuleCount } + if az.Config.VMType == vmTypeVMSS { + az.vmSet = newScaleSet(&az) + } else { + az.vmSet = newAvailabilitySet(&az) + } + if err := initDiskControllers(&az); err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index 6f5e41349db..3cf5d8930fa 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -58,13 +58,13 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua return machine, exists, err } -// GetScaleSetsVMWithRetry invokes az.getScaleSetsVM with exponential backoff retry -func (az *Cloud) GetScaleSetsVMWithRetry(name types.NodeName) (compute.VirtualMachineScaleSetVM, bool, error) { +// GetScaleSetsVMWithRetry invokes ss.getScaleSetVM with exponential backoff retry +func (ss *scaleSet) GetScaleSetsVMWithRetry(name types.NodeName, scaleSetName string) (compute.VirtualMachineScaleSetVM, bool, error) { var machine compute.VirtualMachineScaleSetVM var exists bool - err := wait.ExponentialBackoff(az.resourceRequestBackoff, func() (bool, error) { + err := wait.ExponentialBackoff(ss.resourceRequestBackoff, func() (bool, error) { var retryErr error - machine, exists, retryErr = az.getVmssVirtualMachine(name) + machine, exists, retryErr = ss.getScaleSetVM(string(name), scaleSetName) if retryErr != nil { glog.Errorf("GetScaleSetsVMWithRetry backoff: failure, will retry,err=%v", retryErr) return false, nil diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go index 8378e596a9d..9c5976c3dfc 100644 --- a/pkg/cloudprovider/providers/azure/azure_instances.go +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -22,7 +22,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" - "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" ) @@ -48,6 +47,7 @@ func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) { } return addresses, nil } + ip, err := az.GetIPForMachineWithRetry(name) if err != nil { glog.V(2).Infof("NodeAddresses(%s) abort backoff", name) @@ -64,7 +64,7 @@ func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) { // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here func (az *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) { - name, err := splitProviderID(providerID) + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func (az *Cloud) ExternalID(name types.NodeName) (string, error) { // InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running. // If false is returned with no error, the instance will be immediately deleted by the cloud controller manager. func (az *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) { - name, err := splitProviderID(providerID) + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return false, err } @@ -118,70 +118,14 @@ func (az *Cloud) InstanceID(name types.NodeName) (string, error) { } } - if az.Config.VMType == vmTypeVMSS { - id, err := az.getVmssInstanceID(name) - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { - // Retry with standard type because master nodes may not belong to any vmss. - return az.getStandardInstanceID(name) - } - - return id, err - } - - return az.getStandardInstanceID(name) -} - -func (az *Cloud) getVmssInstanceID(name types.NodeName) (string, error) { - var machine compute.VirtualMachineScaleSetVM - var exists bool - var err error - az.operationPollRateLimiter.Accept() - machine, exists, err = az.getVmssVirtualMachine(name) - if err != nil { - if az.CloudProviderBackoff { - glog.V(2).Infof("InstanceID(%s) backing off", name) - machine, exists, err = az.GetScaleSetsVMWithRetry(name) - if err != nil { - glog.V(2).Infof("InstanceID(%s) abort backoff", name) - return "", err - } - } else { - return "", err - } - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - return *machine.ID, nil -} - -func (az *Cloud) getStandardInstanceID(name types.NodeName) (string, error) { - var machine compute.VirtualMachine - var exists bool - var err error - az.operationPollRateLimiter.Accept() - machine, exists, err = az.getVirtualMachine(name) - if err != nil { - if az.CloudProviderBackoff { - glog.V(2).Infof("InstanceID(%s) backing off", name) - machine, exists, err = az.GetVirtualMachineWithRetry(name) - if err != nil { - glog.V(2).Infof("InstanceID(%s) abort backoff", name) - return "", err - } - } else { - return "", err - } - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - return *machine.ID, nil + return az.vmSet.GetInstanceIDByNodeName(string(name)) } // InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here func (az *Cloud) InstanceTypeByProviderID(providerID string) (string, error) { - name, err := splitProviderID(providerID) + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return "", err } @@ -207,46 +151,7 @@ func (az *Cloud) InstanceType(name types.NodeName) (string, error) { } } - if az.Config.VMType == vmTypeVMSS { - machineType, err := az.getVmssInstanceType(name) - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { - // Retry with standard type because master nodes may not belong to any vmss. - return az.getStandardInstanceType(name) - } - - return machineType, err - } - - return az.getStandardInstanceType(name) -} - -// getVmssInstanceType gets instance with type vmss. -func (az *Cloud) getVmssInstanceType(name types.NodeName) (string, error) { - machine, exists, err := az.getVmssVirtualMachine(name) - if err != nil { - glog.Errorf("error: az.InstanceType(%s), az.getVmssVirtualMachine(%s) err=%v", name, name, err) - return "", err - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - - if machine.Sku.Name != nil { - return *machine.Sku.Name, nil - } - - return "", fmt.Errorf("instance type is not set") -} - -// getStandardInstanceType gets instance with standard type. -func (az *Cloud) getStandardInstanceType(name types.NodeName) (string, error) { - machine, exists, err := az.getVirtualMachine(name) - if err != nil { - glog.Errorf("error: az.InstanceType(%s), az.getVirtualMachine(%s) err=%v", name, name, err) - return "", err - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - return string(machine.HardwareProfile.VMSize), nil + return az.vmSet.GetInstanceTypeByNodeName(string(name)) } // AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances @@ -255,8 +160,8 @@ func (az *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { return fmt.Errorf("not supported") } -// CurrentNodeName returns the name of the node we are currently running on -// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname +// CurrentNodeName returns the name of the node we are currently running on. +// On Azure this is the hostname, so we just return the hostname. func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) { return types.NodeName(hostname), nil } diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index c1bc5bf972f..6657a074d89 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -23,49 +23,48 @@ import ( "strings" "k8s.io/api/core/v1" - utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" serviceapi "k8s.io/kubernetes/pkg/api/v1/service" - "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" "github.com/Azure/go-autorest/autorest/to" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/types" ) -// ServiceAnnotationLoadBalancerInternal is the annotation used on the service -const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal" +const ( + // ServiceAnnotationLoadBalancerInternal is the annotation used on the service + ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal" -// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service -// to specify what subnet it is exposed on -const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet" + // ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service + // to specify what subnet it is exposed on + ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet" -// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the -// Azure load balancer selection based on availability sets -// There are currently three possible load balancer selection modes : -// 1. Default mode - service has no annotation ("service.beta.kubernetes.io/azure-load-balancer-mode") -// In this case the Loadbalancer of the primary Availability set is selected -// 2. "__auto__" mode - service is annotated with __auto__ value, this when loadbalancer from any availability set -// is selected which has the miinimum rules associated with it. -// 3. "as1,as2" mode - this is when the laod balancer from the specified availability sets is selected that has the -// miinimum rules associated with it. -const ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode" + // ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the + // Azure load balancer selection based on availability sets + // There are currently three possible load balancer selection modes : + // 1. Default mode - service has no annotation ("service.beta.kubernetes.io/azure-load-balancer-mode") + // In this case the Loadbalancer of the primary Availability set is selected + // 2. "__auto__" mode - service is annotated with __auto__ value, this when loadbalancer from any availability set + // is selected which has the miinimum rules associated with it. + // 3. "as1,as2" mode - this is when the laod balancer from the specified availability sets is selected that has the + // miinimum rules associated with it. + ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode" -// ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the -// Azure load balancer auto selection from the availability sets -const ServiceAnnotationLoadBalancerAutoModeValue = "__auto__" + // ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the + // Azure load balancer auto selection from the availability sets + ServiceAnnotationLoadBalancerAutoModeValue = "__auto__" -// ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service. -const ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name" + // ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service. + ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name" -// ServiceAnnotationSharedSecurityRule is the annotation used on the service -// to specify that the service should be exposed using an Azure security rule -// that may be shared with other service, trading specificity of rules for an -// increase in the number of services that can be exposed. This relies on the -// Azure "augmented security rules" feature which at the time of writing is in -// preview and available only in certain regions. -const ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule" + // ServiceAnnotationSharedSecurityRule is the annotation used on the service + // to specify that the service should be exposed using an Azure security rule + // that may be shared with other service, trading specificity of rules for an + // increase in the number of services that can be exposed. This relies on the + // Azure "augmented security rules" feature which at the time of writing is in + // preview and available only in certain regions. + ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule" +) // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. @@ -166,15 +165,16 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi return nil } -// getServiceLoadBalancer gets the loadbalancer for the service if it already exists -// If wantLb is TRUE then -it selects a new load balancer +// getServiceLoadBalancer gets the loadbalancer for the service if it already exists. +// If wantLb is TRUE then -it selects a new load balancer. // In case the selected load balancer does not exists it returns network.LoadBalancer struct -// with added metadata (such as name, location) and existsLB set to FALSE -// By default - cluster default LB is returned +// with added metadata (such as name, location) and existsLB set to FALSE. +// By default - cluster default LB is returned. func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) { isInternal := requiresInternalLoadBalancer(service) var defaultLB *network.LoadBalancer - defaultLBName := az.getLoadBalancerName(clusterName, az.Config.PrimaryAvailabilitySetName, isInternal) + primaryVMSetName := az.vmSet.GetPrimaryVMSetName() + defaultLBName := az.getLoadBalancerName(clusterName, primaryVMSetName, isInternal) existingLBs, err := az.ListLBWithRetry() if err != nil { @@ -234,18 +234,19 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi isInternal := requiresInternalLoadBalancer(service) serviceName := getServiceName(service) glog.V(3).Infof("selectLoadBalancer(%s): isInternal(%s) - start", serviceName, isInternal) - availabilitySetNames, err := az.getLoadBalancerAvailabilitySetNames(service, nodes) + vmSetNames, err := az.vmSet.GetVMSetNames(service, nodes) if err != nil { - glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.getLoadBalancerAvailabilitySetNames failed, err=(%v)", clusterName, serviceName, isInternal, err) + glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.GetVMSetNames failed, err=(%v)", clusterName, serviceName, isInternal, err) return nil, false, err } - glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - availabilitysetsnames %v", clusterName, serviceName, isInternal, *availabilitySetNames) + glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - vmSetNames %v", clusterName, serviceName, isInternal, *vmSetNames) + mapExistingLBs := map[string]network.LoadBalancer{} for _, lb := range *existingLBs { mapExistingLBs[*lb.Name] = lb } selectedLBRuleCount := math.MaxInt32 - for _, currASName := range *availabilitySetNames { + for _, currASName := range *vmSetNames { currLBName := az.getLoadBalancerName(clusterName, currASName, isInternal) lb, exists := mapExistingLBs[currLBName] if !exists { @@ -272,13 +273,13 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi } if selectedLB == nil { - err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected availability sets %v", clusterName, serviceName, isInternal, *availabilitySetNames) + err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected VM sets %v", clusterName, serviceName, isInternal, *vmSetNames) glog.Error(err) return nil, false, err } // validate if the selected LB has not exceeded the MaximumLoadBalancerRuleCount if az.Config.MaximumLoadBalancerRuleCount != 0 && selectedLBRuleCount >= az.Config.MaximumLoadBalancerRuleCount { - err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, availabilitysetnames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *availabilitySetNames) + err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, vmSetNames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *vmSetNames) glog.Error(err) return selectedLB, existsLb, err } @@ -741,6 +742,11 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, // because an Azure load balancer cannot have an empty FrontendIPConfigurations collection glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) + // Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB. + vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName) + az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName) + + // Remove the LB. az.operationPollRateLimiter.Accept() glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName) err := az.DeleteLBWithRetry(lbName) @@ -761,23 +767,10 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, if wantLb && nodes != nil { // Add the machines to the backend pool if they're not already - availabilitySetName := az.mapLoadBalancerNameToAvailabilitySet(lbName, clusterName) - hostUpdates := make([]func() error, len(nodes)) - for i, node := range nodes { - localNodeName := node.Name - f := func() error { - err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID, availabilitySetName) - if err != nil { - return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err) - } - return nil - } - hostUpdates[i] = f - } - - errs := utilerrors.AggregateGoroutines(hostUpdates...) - if errs != nil { - return nil, utilerrors.Flatten(errs) + vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName) + err := az.vmSet.EnsureHostsInPool(serviceName, nodes, lbBackendPoolID, vmSetName) + if err != nil { + return nil, err } } @@ -1246,95 +1239,6 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b return false } -// This ensures the given VM's Primary NIC's Primary IP Configuration is -// participating in the specified LoadBalancer Backend Pool. -func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, availabilitySetName string) error { - var machine compute.VirtualMachine - vmName := mapNodeNameToVMName(nodeName) - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", vmName) - machine, err := az.VirtualMachineClientGetWithRetry(az.ResourceGroup, vmName, "") - if err != nil { - glog.V(2).Infof("ensureHostInPool(%s, %s, %s) abort backoff", serviceName, nodeName, backendPoolID) - return err - } - glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", vmName) - - primaryNicID, err := getPrimaryInterfaceID(machine) - if err != nil { - return err - } - nicName, err := getLastSegment(primaryNicID) - if err != nil { - return err - } - - // Check availability set - if availabilitySetName != "" { - expectedAvailabilitySetName := az.getAvailabilitySetID(availabilitySetName) - if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) { - glog.V(3).Infof( - "nicupdate(%s): skipping nic (%s) since it is not in the availabilitySet(%s)", - serviceName, nicName, availabilitySetName) - return nil - } - } - - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) - nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") - glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) - if err != nil { - return err - } - - var primaryIPConfig *network.InterfaceIPConfiguration - primaryIPConfig, err = getPrimaryIPConfig(nic) - if err != nil { - return err - } - - foundPool := false - newBackendPools := []network.BackendAddressPool{} - if primaryIPConfig.LoadBalancerBackendAddressPools != nil { - newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools - } - for _, existingPool := range newBackendPools { - if strings.EqualFold(backendPoolID, *existingPool.ID) { - foundPool = true - break - } - } - if !foundPool { - newBackendPools = append(newBackendPools, - network.BackendAddressPool{ - ID: to.StringPtr(backendPoolID), - }) - - primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools - - glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name) - respChan, errChan := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) - resp := <-respChan - err := <-errChan - glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name) - if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { - glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err) - retryErr := az.CreateOrUpdateInterfaceWithRetry(nic) - if retryErr != nil { - err = retryErr - glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName) - } - } - if err != nil { - return err - } - } - return nil -} - // Check if service requires an internal load balancer. func requiresInternalLoadBalancer(service *v1.Service) bool { if l, ok := service.Annotations[ServiceAnnotationLoadBalancerInternal]; ok { @@ -1354,28 +1258,28 @@ func subnet(service *v1.Service) *string { return nil } -// getServiceLoadBalancerMode parses the mode value -// if the value is __auto__ it returns isAuto = TRUE -// if anything else it returns the unique availability set names after triming spaces -func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, availabilitySetNames []string) { +// getServiceLoadBalancerMode parses the mode value. +// if the value is __auto__ it returns isAuto = TRUE. +// if anything else it returns the unique VM set names after triming spaces. +func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, vmSetNames []string) { mode, hasMode := service.Annotations[ServiceAnnotationLoadBalancerMode] mode = strings.TrimSpace(mode) isAuto = strings.EqualFold(mode, ServiceAnnotationLoadBalancerAutoModeValue) if !isAuto { // Break up list of "AS1,AS2" - availabilitySetParsedList := strings.Split(mode, ",") + vmSetParsedList := strings.Split(mode, ",") - // Trim the availability set names and remove duplicates + // Trim the VM set names and remove duplicates // e.g. {"AS1"," AS2", "AS3", "AS3"} => {"AS1", "AS2", "AS3"} - availabilitySetNameSet := sets.NewString() - for _, v := range availabilitySetParsedList { - availabilitySetNameSet.Insert(strings.TrimSpace(v)) + vmSetNameSet := sets.NewString() + for _, v := range vmSetParsedList { + vmSetNameSet.Insert(strings.TrimSpace(v)) } - availabilitySetNames = availabilitySetNameSet.List() + vmSetNames = vmSetNameSet.List() } - return hasMode, isAuto, availabilitySetNames + return hasMode, isAuto, vmSetNames } func useSharedSecurityRule(service *v1.Service) bool { diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 073a82e6b36..f141ae2d1b0 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -870,6 +870,7 @@ func getTestCloud() (az *Cloud) { az.SecurityGroupsClient = newFakeAzureNSGClient() az.VirtualMachinesClient = newFakeAzureVirtualMachinesClient() az.InterfacesClient = newFakeAzureInterfacesClient() + az.vmSet = newAvailabilitySet(az) return az } @@ -1631,7 +1632,8 @@ func TestDecodeInstanceInfo(t *testing.T) { } } -func TestSplitProviderID(t *testing.T) { +func TestGetNodeNameByProviderID(t *testing.T) { + az := getTestCloud() providers := []struct { providerID string name types.NodeName @@ -1666,7 +1668,7 @@ func TestSplitProviderID(t *testing.T) { } for _, test := range providers { - name, err := splitProviderID(test.providerID) + name, err := az.vmSet.GetNodeNameByProviderID(test.providerID) if (err != nil) != test.fail { t.Errorf("Expected to failt=%t, with pattern %v", test.fail, test) } diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index 958275ca66b..6181550571b 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -30,8 +30,10 @@ import ( "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" ) @@ -54,6 +56,7 @@ const ( nodeLabelRole = "kubernetes.io/role" ) +var errNotInVMSet = errors.New("vm is not in the vmset") var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`) // returns the full identifier of a machine @@ -133,115 +136,22 @@ func (az *Cloud) getpublicIPAddressID(pipName string) string { pipName) } -// getLoadBalancerAvailabilitySetNames selects all possible availability sets for -// service load balancer, if the service has no loadbalancer mode annotaion returns the -// primary availability set if service annotation for loadbalancer availability set -// exists then return the eligible a availability set -func (az *Cloud) getLoadBalancerAvailabilitySetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) { - hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service) - if !hasMode { - // no mode specified in service annotation default to PrimaryAvailabilitySetName - availabilitySetNames = &[]string{az.Config.PrimaryAvailabilitySetName} - return availabilitySetNames, nil - } - availabilitySetNames, err = az.getAgentPoolAvailabiliySets(nodes) - if err != nil { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - getAgentPoolAvailabiliySets failed err=(%v)", err) - return nil, err - } - if len(*availabilitySetNames) == 0 { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes)) - return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes)) - } - // sort the list to have deterministic selection - sort.Strings(*availabilitySetNames) - if !isAuto { - if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 { - return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value") - } - // validate availability set exists - var found bool - for sasx := range serviceAvailabilitySetNames { - for asx := range *availabilitySetNames { - if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) { - found = true - serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx] - break - } - } - if !found { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx]) - return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx]) - } - } - availabilitySetNames = &serviceAvailabilitySetNames - } - - return availabilitySetNames, nil -} - -// lists the virtual machines for for the resource group and then builds -// a list of availability sets that match the nodes available to k8s -func (az *Cloud) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) { - vms, err := az.VirtualMachineClientListWithRetry() - if err != nil { - glog.Errorf("az.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err) - return nil, err - } - vmNameToAvailabilitySetID := make(map[string]string, len(vms)) - for vmx := range vms { - vm := vms[vmx] - if vm.AvailabilitySet != nil { - vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID - } - } - availabilitySetIDs := sets.NewString() - agentPoolAvailabilitySets = &[]string{} - for nx := range nodes { - nodeName := (*nodes[nx]).Name - if isMasterNode(nodes[nx]) { - continue - } - asID, ok := vmNameToAvailabilitySetID[nodeName] - if !ok { - glog.Errorf("az.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName) - return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName) - } - if availabilitySetIDs.Has(asID) { - // already added in the list - continue - } - asName, err := getLastSegment(asID) - if err != nil { - glog.Errorf("az.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err) - return nil, err - } - // AvailabilitySet ID is currently upper cased in a indeterministic way - // We want to keep it lower case, before the ID get fixed - asName = strings.ToLower(asName) - - *agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName) - } - - return agentPoolAvailabilitySets, nil -} - -func (az *Cloud) mapLoadBalancerNameToAvailabilitySet(lbName string, clusterName string) (availabilitySetName string) { - availabilitySetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix) +func (az *Cloud) mapLoadBalancerNameToVMSet(lbName string, clusterName string) (vmSetName string) { + vmSetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix) if strings.EqualFold(clusterName, lbName) { - availabilitySetName = az.Config.PrimaryAvailabilitySetName + vmSetName = az.vmSet.GetPrimaryVMSetName() } - return availabilitySetName + return vmSetName } // For a load balancer, all frontend ip should reference either a subnet or publicIpAddress. // Thus Azure do not allow mixed type (public and internal) load balancer. // So we'd have a separate name for internal load balancer. // This would be the name for Azure LoadBalancer resource. -func (az *Cloud) getLoadBalancerName(clusterName string, availabilitySetName string, isInternal bool) string { - lbNamePrefix := availabilitySetName - if strings.EqualFold(availabilitySetName, az.Config.PrimaryAvailabilitySetName) { +func (az *Cloud) getLoadBalancerName(clusterName string, vmSetName string, isInternal bool) string { + lbNamePrefix := vmSetName + if strings.EqualFold(vmSetName, az.vmSet.GetPrimaryVMSetName()) { lbNamePrefix = clusterName } if isInternal { @@ -402,67 +312,7 @@ outer: } func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) { - if az.Config.VMType == vmTypeVMSS { - ip, err := az.getIPForVmssMachine(nodeName) - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { - return az.getIPForStandardMachine(nodeName) - } - - return ip, err - } - - return az.getIPForStandardMachine(nodeName) -} - -func (az *Cloud) getIPForStandardMachine(nodeName types.NodeName) (string, error) { - az.operationPollRateLimiter.Accept() - machine, exists, err := az.getVirtualMachine(nodeName) - if !exists { - return "", cloudprovider.InstanceNotFound - } - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err) - return "", err - } - - nicID, err := getPrimaryInterfaceID(machine) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) - return "", err - } - - nicName, err := getLastSegment(nicID) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) - return "", err - } - - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) - nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") - glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err) - return "", err - } - - ipConfig, err := getPrimaryIPConfig(nic) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err) - return "", err - } - - targetIP := *ipConfig.PrivateIPAddress - return targetIP, nil -} - -// splitProviderID converts a providerID to a NodeName. -func splitProviderID(providerID string) (types.NodeName, error) { - matches := providerIDRE.FindStringSubmatch(providerID) - if len(matches) != 2 { - return "", errors.New("error splitting providerID") - } - return types.NodeName(matches[1]), nil + return az.vmSet.GetIPByNodeName(string(nodeName), "") } var polyTable = crc32.MakeTable(crc32.Koopman) @@ -519,3 +369,333 @@ func ExtractDiskData(diskData interface{}) (provisioningState string, diskState } return provisioningState, diskState, nil } + +// availabilitySet implements VMSet interface for Azure availability sets. +type availabilitySet struct { + *Cloud +} + +// newStandardSet creates a new availabilitySet. +func newAvailabilitySet(az *Cloud) VMSet { + return &availabilitySet{ + Cloud: az, + } +} + +// GetInstanceIDByNodeName gets the cloud provider ID by node name. +// It must return ("", cloudprovider.InstanceNotFound) if the instance does +// not exist or is no longer running. +func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) { + var machine compute.VirtualMachine + var exists bool + var err error + + as.operationPollRateLimiter.Accept() + machine, exists, err = as.getVirtualMachine(types.NodeName(name)) + if err != nil { + if as.CloudProviderBackoff { + glog.V(2).Infof("InstanceID(%s) backing off", name) + machine, exists, err = as.GetVirtualMachineWithRetry(types.NodeName(name)) + if err != nil { + glog.V(2).Infof("InstanceID(%s) abort backoff", name) + return "", err + } + } else { + return "", err + } + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + return *machine.ID, nil +} + +// GetNodeNameByProviderID gets the node name by provider ID. +func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) { + // NodeName is part of providerID for standard instances. + matches := providerIDRE.FindStringSubmatch(providerID) + if len(matches) != 2 { + return "", errors.New("error splitting providerID") + } + + return types.NodeName(matches[1]), nil +} + +// GetInstanceTypeByNodeName gets the instance type by node name. +func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { + machine, exists, err := as.getVirtualMachine(types.NodeName(name)) + if err != nil { + glog.Errorf("error: as.GetInstanceTypeByNodeName(%s), as.getVirtualMachine(%s) err=%v", name, name, err) + return "", err + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + + return string(machine.HardwareProfile.VMSize), nil +} + +// GetZoneByNodeName gets zone from instance view. +func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { + vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView) + if err != nil { + return cloudprovider.Zone{}, err + } + + failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain)) + zone := cloudprovider.Zone{ + FailureDomain: failureDomain, + Region: *(vm.Location), + } + return zone, nil +} + +// GetPrimaryVMSetName returns the VM set name depending on the configured vmType. +// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType. +func (as *availabilitySet) GetPrimaryVMSetName() string { + return as.Config.PrimaryAvailabilitySetName +} + +// GetIPByNodeName gets machine IP by node name. +func (as *availabilitySet) GetIPByNodeName(name, vmSetName string) (string, error) { + nic, err := as.GetPrimaryInterface(name, vmSetName) + if err != nil { + return "", err + } + + ipConfig, err := getPrimaryIPConfig(nic) + if err != nil { + glog.Errorf("error: as.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", name, nic, err) + return "", err + } + + targetIP := *ipConfig.PrivateIPAddress + return targetIP, nil +} + +// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds +// a list of availability sets that match the nodes available to k8s. +func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) { + vms, err := as.VirtualMachineClientListWithRetry() + if err != nil { + glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err) + return nil, err + } + vmNameToAvailabilitySetID := make(map[string]string, len(vms)) + for vmx := range vms { + vm := vms[vmx] + if vm.AvailabilitySet != nil { + vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID + } + } + availabilitySetIDs := sets.NewString() + agentPoolAvailabilitySets = &[]string{} + for nx := range nodes { + nodeName := (*nodes[nx]).Name + if isMasterNode(nodes[nx]) { + continue + } + asID, ok := vmNameToAvailabilitySetID[nodeName] + if !ok { + glog.Errorf("as.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName) + return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName) + } + if availabilitySetIDs.Has(asID) { + // already added in the list + continue + } + asName, err := getLastSegment(asID) + if err != nil { + glog.Errorf("as.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err) + return nil, err + } + // AvailabilitySet ID is currently upper cased in a indeterministic way + // We want to keep it lower case, before the ID get fixed + asName = strings.ToLower(asName) + + *agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName) + } + + return agentPoolAvailabilitySets, nil +} + +// GetVMSetNames selects all possible availability sets or scale sets +// (depending vmType configured) for service load balancer, if the service has +// no loadbalancer mode annotaion returns the primary VMSet. If service annotation +// for loadbalancer exists then return the eligible VMSet. +func (as *availabilitySet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) { + hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service) + if !hasMode { + // no mode specified in service annotation default to PrimaryAvailabilitySetName + availabilitySetNames = &[]string{as.Config.PrimaryAvailabilitySetName} + return availabilitySetNames, nil + } + availabilitySetNames, err = as.getAgentPoolAvailabiliySets(nodes) + if err != nil { + glog.Errorf("as.GetVMSetNames - getAgentPoolAvailabiliySets failed err=(%v)", err) + return nil, err + } + if len(*availabilitySetNames) == 0 { + glog.Errorf("as.GetVMSetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes)) + return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes)) + } + // sort the list to have deterministic selection + sort.Strings(*availabilitySetNames) + if !isAuto { + if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 { + return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value") + } + // validate availability set exists + var found bool + for sasx := range serviceAvailabilitySetNames { + for asx := range *availabilitySetNames { + if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) { + found = true + serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx] + break + } + } + if !found { + glog.Errorf("as.GetVMSetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx]) + return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx]) + } + } + availabilitySetNames = &serviceAvailabilitySetNames + } + + return availabilitySetNames, nil +} + +// GetPrimaryInterface gets machine primary network interface by node name and vmSet. +func (as *availabilitySet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) { + var machine compute.VirtualMachine + + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", nodeName) + machine, err := as.VirtualMachineClientGetWithRetry(as.ResourceGroup, nodeName, "") + if err != nil { + glog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) + return network.Interface{}, err + } + glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", nodeName) + + primaryNicID, err := getPrimaryInterfaceID(machine) + if err != nil { + return network.Interface{}, err + } + nicName, err := getLastSegment(primaryNicID) + if err != nil { + return network.Interface{}, err + } + + // Check availability set + if vmSetName != "" { + expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName) + if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) { + glog.V(3).Infof( + "GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName) + return network.Interface{}, errNotInVMSet + } + } + + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) + nic, err := as.InterfacesClient.Get(as.ResourceGroup, nicName, "") + glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) + if err != nil { + return network.Interface{}, err + } + + return nic, nil +} + +// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is +// participating in the specified LoadBalancer Backend Pool. +func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, vmSetName string) error { + vmName := mapNodeNameToVMName(nodeName) + nic, err := as.GetPrimaryInterface(vmName, vmSetName) + if err != nil { + if err == errNotInVMSet { + glog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName) + return nil + } + + glog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) + return err + } + + var primaryIPConfig *network.InterfaceIPConfiguration + primaryIPConfig, err = getPrimaryIPConfig(nic) + if err != nil { + return err + } + + foundPool := false + newBackendPools := []network.BackendAddressPool{} + if primaryIPConfig.LoadBalancerBackendAddressPools != nil { + newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools + } + for _, existingPool := range newBackendPools { + if strings.EqualFold(backendPoolID, *existingPool.ID) { + foundPool = true + break + } + } + if !foundPool { + newBackendPools = append(newBackendPools, + network.BackendAddressPool{ + ID: to.StringPtr(backendPoolID), + }) + + primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools + + nicName := *nic.Name + glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name) + respChan, errChan := as.InterfacesClient.CreateOrUpdate(as.ResourceGroup, *nic.Name, nic, nil) + resp := <-respChan + err := <-errChan + glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name) + if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { + glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err) + retryErr := as.CreateOrUpdateInterfaceWithRetry(nic) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName) + } + } + if err != nil { + return err + } + } + return nil +} + +// EnsureHostsInPool ensures the given Node's primary IP configurations are +// participating in the specified LoadBalancer Backend Pool. +func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error { + hostUpdates := make([]func() error, len(nodes)) + for i, node := range nodes { + localNodeName := node.Name + f := func() error { + err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName) + if err != nil { + return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", serviceName, backendPoolID, err) + } + return nil + } + hostUpdates[i] = f + } + + errs := utilerrors.AggregateGoroutines(hostUpdates...) + if errs != nil { + return utilerrors.Flatten(errs) + } + + return nil +} + +// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. +func (as *availabilitySet) EnsureBackendPoolDeleted(poolID, vmSetName string) error { + // Do nothing for availability set. + return nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_util_test.go b/pkg/cloudprovider/providers/azure/azure_util_test.go index 46f351f47b5..cac803c2eb0 100644 --- a/pkg/cloudprovider/providers/azure/azure_util_test.go +++ b/pkg/cloudprovider/providers/azure/azure_util_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGetVmssInstanceID(t *testing.T) { +func TestGetScaleSetVMInstanceID(t *testing.T) { tests := []struct { msg string machineName string @@ -43,7 +43,7 @@ func TestGetVmssInstanceID(t *testing.T) { } for i, test := range tests { - instanceID, err := getVmssInstanceID(test.machineName) + instanceID, err := getScaleSetVMInstanceID(test.machineName) if test.expectError { assert.Error(t, err, fmt.Sprintf("TestCase[%d]: %s", i, test.msg)) } else { diff --git a/pkg/cloudprovider/providers/azure/azure_util_vmss.go b/pkg/cloudprovider/providers/azure/azure_util_vmss.go index e16e3173552..45f631af951 100644 --- a/pkg/cloudprovider/providers/azure/azure_util_vmss.go +++ b/pkg/cloudprovider/providers/azure/azure_util_vmss.go @@ -17,51 +17,222 @@ limitations under the License. package azure import ( + "errors" "fmt" + "regexp" + "sort" "strconv" + "strings" "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" "github.com/golang/glog" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/cloudprovider" ) -func (az *Cloud) getIPForVmssMachine(nodeName types.NodeName) (string, error) { - az.operationPollRateLimiter.Accept() - machine, exists, err := az.getVmssVirtualMachine(nodeName) +var ( + // ErrorNotVmssInstance indicates an instance is not belongint to any vmss. + ErrorNotVmssInstance = errors.New("not a vmss instance") + + scaleSetNameRE = regexp.MustCompile(`^/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`) +) + +// scaleSet implements VMSet interface for Azure scale set. +type scaleSet struct { + *Cloud + + // availabilitySet is also required for scaleSet because some instances + // (e.g. master nodes) may not belong to any scale sets. + availabilitySet VMSet +} + +// newScaleSet creates a new scaleSet. +func newScaleSet(az *Cloud) VMSet { + return &scaleSet{ + Cloud: az, + availabilitySet: newAvailabilitySet(az), + } +} + +// GetInstanceIDByNodeName gets the cloud provider ID by node name. +// It must return ("", cloudprovider.InstanceNotFound) if the instance does +// not exist or is no longer running. +func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { + instanceID, err := ss.getScaleSetInstanceIDByName(name, ss.PrimaryScaleSetName) + if err != nil { + if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { + // Retry with standard type because master nodes may not belong to any vmss. + // TODO: find a better way to identify the type of VM. + return ss.availabilitySet.GetInstanceIDByNodeName(name) + } + + return "", err + } + + return instanceID, nil +} + +func (ss *scaleSet) getScaleSetInstanceIDByName(name, scaleSetName string) (string, error) { + var machine compute.VirtualMachineScaleSetVM + var exists bool + var err error + + ss.operationPollRateLimiter.Accept() + machine, exists, err = ss.getScaleSetVM(name, scaleSetName) + if err != nil { + if ss.CloudProviderBackoff { + glog.V(2).Infof("InstanceID(%s) backing off", name) + machine, exists, err = ss.GetScaleSetsVMWithRetry(types.NodeName(name), scaleSetName) + if err != nil { + glog.V(2).Infof("InstanceID(%s) abort backoff", name) + return "", err + } + } else { + return "", err + } + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + + return *machine.ID, nil +} + +func (ss *scaleSet) getScaleSetVM(nodeName, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) { + instanceID, err := getScaleSetVMInstanceID(nodeName) + if err != nil { + return vm, false, err + } + + return ss.getScaleSetVMByID(instanceID, scaleSetName) +} + +func (ss *scaleSet) getScaleSetVMByID(instanceID, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) { + var realErr error + + // scaleSetName is required to query VM info. + if scaleSetName == "" { + scaleSetName = ss.PrimaryScaleSetName + } + + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", instanceID) + vm, err = ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, scaleSetName, instanceID) + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", instanceID) + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return vm, false, realErr + } + if !exists { + return vm, false, nil + } + + return vm, exists, err +} + +// GetNodeNameByProviderID gets the node name by provider ID. +func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) { + // NodeName is not part of providerID for vmss instances. + parts := strings.Split(providerID, "/") + instanceID := parts[len(parts)-1] + machine, exist, err := ss.getScaleSetVMByID(instanceID, ss.PrimaryScaleSetName) + if !exist { return "", cloudprovider.InstanceNotFound } if err != nil { - glog.Errorf("error: az.getIPForVmssMachine(%s), az.getVmssVirtualMachine(%s), err=%v", nodeName, nodeName, err) return "", err } - nicID, err := getPrimaryInterfaceIDForVmssMachine(machine) + return types.NodeName(*machine.OsProfile.ComputerName), nil +} + +// GetInstanceTypeByNodeName gets the instance type by node name. +func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { + instanceType, err := ss.getScaleSetInstanceTypeByNodeName(name) if err != nil { - glog.Errorf("error: az.getIPForVmssMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) + if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { + // Retry with standard type because master nodes may not belong to any vmss. + // TODO: find a better way to identify the type of VM. + return ss.availabilitySet.GetInstanceTypeByNodeName(name) + } + return "", err } - nicName, err := getLastSegment(nicID) + return instanceType, nil +} + +func (ss *scaleSet) getScaleSetInstanceTypeByNodeName(name string) (string, error) { + machine, exists, err := ss.getScaleSetVM(name, ss.PrimaryScaleSetName) if err != nil { - glog.Errorf("error: az.getIPForVmssMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) + glog.Errorf("error: ss.getScaleSetInstanceTypeByNodeName(%s), ss.getScaleSetVM(%s) err=%v", name, name, err) return "", err + } else if !exists { + return "", cloudprovider.InstanceNotFound } - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) - nic, err := az.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(az.ResourceGroup, az.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "") - glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) + if machine.Sku.Name != nil { + return *machine.Sku.Name, nil + } + + return "", fmt.Errorf("instance type is not defined") +} + +// GetZoneByNodeName gets cloudprovider.Zone by node name. +func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { + instanceID, err := getScaleSetVMInstanceID(name) if err != nil { - glog.Errorf("error: az.getIPForVmssMachine(%s), az.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err) + if err == ErrorNotVmssInstance { + // Retry with standard type because master nodes may not belong to any vmss. + // TODO: find a better way to identify the type of VM. + return ss.availabilitySet.GetZoneByNodeName(name) + } + return cloudprovider.Zone{}, err + } + + vm, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID) + if err != nil { + return cloudprovider.Zone{}, err + } + + // PlatformFaultDomain is not included in VirtualMachineScaleSetVM, so we get it from VirtualMachineScaleSetVMInstanceView. + vmView, err := ss.VirtualMachineScaleSetVMsClient.GetInstanceView(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID) + if err != nil { + return cloudprovider.Zone{}, err + } + + failureDomain := strconv.Itoa(int(*vmView.PlatformFaultDomain)) + zone := cloudprovider.Zone{ + FailureDomain: failureDomain, + Region: *(vm.Location), + } + return zone, nil +} + +// GetPrimaryVMSetName returns the VM set name depending on the configured vmType. +// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType. +func (ss *scaleSet) GetPrimaryVMSetName() string { + return ss.Config.PrimaryScaleSetName +} + +// GetIPByNodeName gets machine IP by node name. +func (ss *scaleSet) GetIPByNodeName(nodeName, vmSetName string) (string, error) { + nic, err := ss.GetPrimaryInterface(nodeName, vmSetName) + if err != nil { + glog.Errorf("error: ss.GetIPByNodeName(%s), GetPrimaryInterface(%q, %q), err=%v", nodeName, nodeName, vmSetName, err) return "", err } ipConfig, err := getPrimaryIPConfig(nic) if err != nil { - glog.Errorf("error: az.getIPForVmssMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err) + glog.Errorf("error: ss.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err) return "", err } @@ -70,7 +241,7 @@ func (az *Cloud) getIPForVmssMachine(nodeName types.NodeName) (string, error) { } // This returns the full identifier of the primary NIC for the given VM. -func getPrimaryInterfaceIDForVmssMachine(machine compute.VirtualMachineScaleSetVM) (string, error) { +func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSetVM) (string, error) { if len(*machine.NetworkProfile.NetworkInterfaces) == 1 { return *(*machine.NetworkProfile.NetworkInterfaces)[0].ID, nil } @@ -87,7 +258,7 @@ func getPrimaryInterfaceIDForVmssMachine(machine compute.VirtualMachineScaleSetV // machineName is composed of computerNamePrefix and 36-based instanceID. // And instanceID part if in fixed length of 6 characters. // Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/. -func getVmssInstanceID(machineName string) (string, error) { +func getScaleSetVMInstanceID(machineName string) (string, error) { nameLength := len(machineName) if nameLength < 6 { return "", ErrorNotVmssInstance @@ -100,3 +271,563 @@ func getVmssInstanceID(machineName string) (string, error) { return fmt.Sprintf("%d", instanceID), nil } + +// extractScaleSetNameByVMID extracts the scaleset name by scaleSetVirtualMachine's ID. +func extractScaleSetNameByVMID(vmID string) (string, error) { + matches := scaleSetNameRE.FindStringSubmatch(vmID) + if len(matches) != 2 { + return "", ErrorNotVmssInstance + } + + return matches[1], nil +} + +// listScaleSetsWithRetry lists scale sets with exponential backoff retry. +func (ss *scaleSet) listScaleSetsWithRetry() ([]string, error) { + var err error + var result compute.VirtualMachineScaleSetListResult + allScaleSets := make([]string, 0) + + backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.List start for %v", ss.ResourceGroup) + result, err = ss.VirtualMachineScaleSetsClient.List(ss.ResourceGroup) + glog.V(10).Infof("VirtualMachineScaleSetsClient.List end for %v", ss.ResourceGroup) + if err != nil { + glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", ss.ResourceGroup, err) + return false, err + } + + return true, nil + }) + if backoffError != nil { + return nil, backoffError + } + + appendResults := (result.Value != nil && len(*result.Value) > 1) + for appendResults { + for _, scaleSet := range *result.Value { + allScaleSets = append(allScaleSets, *scaleSet.Name) + } + appendResults = false + + if result.NextLink != nil { + backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.ListNextResults start for %v", ss.ResourceGroup) + result, err = ss.VirtualMachineScaleSetsClient.ListNextResults(result) + glog.V(10).Infof("VirtualMachineScaleSetsClient.ListNextResults end for %v", ss.ResourceGroup) + if err != nil { + glog.Errorf("VirtualMachineScaleSetsClient.ListNextResults for %v failed: %v", ss.ResourceGroup, err) + return false, err + } + + return true, nil + }) + if backoffError != nil { + return nil, backoffError + } + + appendResults = (result.Value != nil && len(*result.Value) > 1) + } + + } + + return allScaleSets, nil +} + +// listScaleSetVMsWithRetry lists VMs belonging to the specified scale set with exponential backoff retry. +func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.VirtualMachineScaleSetVM, error) { + var err error + var result compute.VirtualMachineScaleSetVMListResult + allVMs := make([]compute.VirtualMachineScaleSetVM, 0) + + backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List start for %v", scaleSetName) + result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", "") + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List end for %v", scaleSetName) + if err != nil { + glog.Errorf("VirtualMachineScaleSetVMsClient.List for %v failed: %v", scaleSetName, err) + return false, err + } + + return true, nil + }) + if backoffError != nil { + return nil, backoffError + } + + appendResults := (result.Value != nil && len(*result.Value) > 1) + for appendResults { + allVMs = append(allVMs, *result.Value...) + appendResults = false + + if result.NextLink != nil { + backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.ListNextResults start for %v", scaleSetName) + result, err = ss.VirtualMachineScaleSetVMsClient.ListNextResults(result) + glog.V(10).Infof("VirtualMachineScaleSetVMsClient.ListNextResults end for %v", ss.ResourceGroup) + if err != nil { + glog.Errorf("VirtualMachineScaleSetVMsClient.ListNextResults for %v failed: %v", scaleSetName, err) + return false, err + } + + return true, nil + }) + if backoffError != nil { + return nil, backoffError + } + + appendResults = (result.Value != nil && len(*result.Value) > 1) + } + + } + + return allVMs, nil +} + +// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds +// a list of availability sets that match the nodes available to k8s. +func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { + scaleSetNames, err := ss.listScaleSetsWithRetry() + if err != nil { + return nil, err + } + + vmNameToScaleSetName := make(map[string]string, len(scaleSetNames)) + for _, scaleSetName := range scaleSetNames { + vms, err := ss.listScaleSetVMsWithRetry(scaleSetName) + if err != nil { + return nil, err + } + + for idx := range vms { + vm := vms[idx] + if vm.OsProfile != nil || vm.OsProfile.ComputerName != nil { + vmNameToScaleSetName[*vm.OsProfile.ComputerName] = scaleSetName + } + } + } + + agentPoolScaleSets := &[]string{} + availableScaleSetNames := sets.NewString() + for nx := range nodes { + if isMasterNode(nodes[nx]) { + continue + } + + nodeName := nodes[nx].Name + ssName, ok := vmNameToScaleSetName[nodeName] + if !ok { + // TODO: support master nodes not managed by VMSS. + glog.Errorf("Node %q is not belonging to any known scale sets", nodeName) + return nil, fmt.Errorf("node %q is not belonging to any known scale sets", nodeName) + } + + if availableScaleSetNames.Has(ssName) { + continue + } + + *agentPoolScaleSets = append(*agentPoolScaleSets, ssName) + } + + return agentPoolScaleSets, nil +} + +// GetVMSetNames selects all possible availability sets or scale sets +// (depending vmType configured) for service load balancer. If the service has +// no loadbalancer mode annotaion returns the primary VMSet. If service annotation +// for loadbalancer exists then return the eligible VMSet. +func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetNames *[]string, err error) { + hasMode, isAuto, serviceVMSetNames := getServiceLoadBalancerMode(service) + if !hasMode { + // no mode specified in service annotation default to PrimaryScaleSetName. + scaleSetNames := &[]string{ss.Config.PrimaryScaleSetName} + return scaleSetNames, nil + } + + scaleSetNames, err := ss.getAgentPoolScaleSets(nodes) + if err != nil { + glog.Errorf("ss.GetVMSetNames - getAgentPoolScaleSets failed err=(%v)", err) + return nil, err + } + if len(*scaleSetNames) == 0 { + glog.Errorf("ss.GetVMSetNames - No scale sets found for nodes in the cluster, node count(%d)", len(nodes)) + return nil, fmt.Errorf("No scale sets found for nodes, node count(%d)", len(nodes)) + } + + // sort the list to have deterministic selection + sort.Strings(*scaleSetNames) + + if !isAuto { + if serviceVMSetNames == nil || len(serviceVMSetNames) == 0 { + return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value") + } + // validate scale set exists + var found bool + for sasx := range serviceVMSetNames { + for asx := range *scaleSetNames { + if strings.EqualFold((*scaleSetNames)[asx], serviceVMSetNames[sasx]) { + found = true + serviceVMSetNames[sasx] = (*scaleSetNames)[asx] + break + } + } + if !found { + glog.Errorf("ss.GetVMSetNames - scale set (%s) in service annotation not found", serviceVMSetNames[sasx]) + return nil, fmt.Errorf("scale set (%s) - not found", serviceVMSetNames[sasx]) + } + } + vmSetNames = &serviceVMSetNames + } + + return vmSetNames, nil +} + +// GetPrimaryInterface gets machine primary network interface by node name and vmSet. +func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) { + ss.operationPollRateLimiter.Accept() + machine, exists, err := ss.getScaleSetVM(nodeName, vmSetName) + if !exists || err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { + // Retry with standard type because master nodes may not belong to any vmss. + // TODO: find a better way to identify the type of VM. + return ss.availabilitySet.GetPrimaryInterface(nodeName, "") + } + if err != nil { + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getScaleSetVM(%s), err=%v", nodeName, nodeName, err) + return network.Interface{}, err + } + + nicID, err := ss.getPrimaryInterfaceID(machine) + if err != nil { + glog.Errorf("error: ss.GetPrimaryInterface(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) + return network.Interface{}, err + } + + nicName, err := getLastSegment(nicID) + if err != nil { + glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) + return network.Interface{}, err + } + + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) + nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "") + glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) + if err != nil { + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, nicName, "", err) + return network.Interface{}, err + } + + // Fix interface's location, which is required when updating the interface. + // TODO: is this a bug of azure SDK? + if nic.Location == nil || *nic.Location == "" { + nic.Location = &ss.Config.Location + } + + return nic, nil +} + +// getScaleSet gets a scale set by name. +func (ss *scaleSet) getScaleSet(name string) (compute.VirtualMachineScaleSet, bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.Get(%s): start", name) + result, err := ss.VirtualMachineScaleSetsClient.Get(ss.ResourceGroup, name) + glog.V(10).Infof("VirtualMachineScaleSetsClient.Get(%s): end", name) + + exists, realErr := checkResourceExistsFromError(err) + if realErr != nil { + return result, false, realErr + } + + if !exists { + return result, false, nil + } + + return result, exists, err +} + +// getScaleSetWithRetry gets scale set with exponential backoff retry +func (ss *scaleSet) getScaleSetWithRetry(name string) (compute.VirtualMachineScaleSet, bool, error) { + var result compute.VirtualMachineScaleSet + var exists bool + + err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + var retryErr error + result, exists, retryErr = ss.getScaleSet(name) + if retryErr != nil { + glog.Errorf("backoff: failure, will retry,err=%v", retryErr) + return false, nil + } + glog.V(2).Infof("backoff: success") + return true, nil + }) + + return result, exists, err +} + +// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets. +func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) { + networkConfigurations := *networkConfigurationList + if len(networkConfigurations) == 1 { + return &networkConfigurations[0], nil + } + + for idx := range networkConfigurations { + networkConfig := &networkConfigurations[idx] + if networkConfig.Primary != nil && *networkConfig.Primary == true { + return networkConfig, nil + } + } + + return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName) +} + +func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) { + ipConfigurations := *config.IPConfigurations + if len(ipConfigurations) == 1 { + return &ipConfigurations[0], nil + } + + for idx := range ipConfigurations { + ipConfig := &ipConfigurations[idx] + if ipConfig.Primary != nil && *ipConfig.Primary == true { + return ipConfig, nil + } + } + + return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName) +} + +// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry. +func (ss *scaleSet) createOrUpdateVMSSWithRetry(virtualMachineScaleSet compute.VirtualMachineScaleSet) error { + return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): start", *virtualMachineScaleSet.Name) + respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet, nil) + resp := <-respChan + err := <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name) + return processRetryResponse(resp.Response, err) + }) +} + +// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry. +func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error { + return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): start", scaleSetName) + respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, scaleSetName, vmInstanceIDs, nil) + resp := <-respChan + err := <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName) + return processRetryResponse(resp.Response, err) + }) +} + +// EnsureHostsInPool ensures the given Node's primary IP configurations are +// participating in the specified LoadBalancer Backend Pool. +func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error { + virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName) + if err != nil { + glog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err) + return err + } + if !exists { + errorMessage := fmt.Errorf("Scale set %q not found", vmSetName) + glog.Errorf("%v", errorMessage) + return errorMessage + } + + // Find primary network interface configuration. + networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations + primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName) + if err != nil { + return err + } + + // Find primary IP configuration. + primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName) + if err != nil { + return err + } + + // Update primary IP configuration's LoadBalancerBackendAddressPools. + foundPool := false + newBackendPools := []compute.SubResource{} + if primaryIPConfiguration.LoadBalancerBackendAddressPools != nil { + newBackendPools = *primaryIPConfiguration.LoadBalancerBackendAddressPools + } + for _, existingPool := range newBackendPools { + if strings.EqualFold(backendPoolID, *existingPool.ID) { + foundPool = true + break + } + } + if !foundPool { + newBackendPools = append(newBackendPools, + compute.SubResource{ + ID: to.StringPtr(backendPoolID), + }) + primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools + + glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating", serviceName, vmSetName) + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): start", vmSetName) + respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil) + resp := <-respChan + err := <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName) + if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { + glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%): scale set (%s) - updating, err=%v", serviceName, vmSetName, err) + retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%) abort backoff: scale set (%s) - updating", serviceName, vmSetName) + } + } + if err != nil { + return err + } + } + + // Construct instanceIDs from nodes. + instanceIDs := []string{} + for _, curNode := range nodes { + curScaleSetName, err := extractScaleSetNameByVMID(curNode.Spec.ExternalID) + if err != nil { + glog.V(2).Infof("Node %q is not belonging to any scale sets, omitting it", curNode.Name) + continue + } + if curScaleSetName != vmSetName { + glog.V(2).Infof("Node %q is not belonging to scale set %q, omitting it", curNode.Name, vmSetName) + continue + } + + instanceID, err := getLastSegment(curNode.Spec.ExternalID) + if err != nil { + glog.Errorf("Failed to get last segment from %q: %v", curNode.Spec.ExternalID, err) + return err + } + + instanceIDs = append(instanceIDs, instanceID) + } + + // Update instances to latest VMSS model. + vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): start", vmSetName) + respChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil) + resp := <-respChan + err = <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName) + if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { + glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%): scale set (%s) - updating, err=%v", serviceName, vmSetName, err) + retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%) abort backoff: scale set (%s) - updating", serviceName, vmSetName) + } + } + if err != nil { + return err + } + + return nil +} + +// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. +func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error { + virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName) + if err != nil { + glog.Errorf("ss.EnsureBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, vmSetName, vmSetName, err) + return err + } + if !exists { + glog.V(2).Infof("ss.EnsureBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, vmSetName, vmSetName) + return nil + } + + // Find primary network interface configuration. + networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations + primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName) + if err != nil { + return err + } + + // Find primary IP configuration. + primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName) + if err != nil { + return err + } + + // Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration. + if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 { + return nil + } + existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools + newBackendPools := []compute.SubResource{} + foundPool := false + for i := len(existingBackendPools) - 1; i >= 0; i-- { + curPool := existingBackendPools[i] + if strings.EqualFold(poolID, *curPool.ID) { + glog.V(10).Infof("EnsureBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, vmSetName) + foundPool = true + newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...) + } + } + if !foundPool { + // Pool not found, assume it has been already removed. + return nil + } + + // Update scale set with backoff. + primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools + glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", vmSetName) + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): start", vmSetName) + respChan, errChan := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ss.ResourceGroup, vmSetName, virtualMachineScaleSet, nil) + resp := <-respChan + err = <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName) + if ss.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { + glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err) + retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", vmSetName) + } + } + if err != nil { + return err + } + + // Update instances to latest VMSS model. + instanceIDs := []string{"*"} + vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ + InstanceIds: &instanceIDs, + } + ss.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): start", vmSetName) + updateRespChan, errChan := ss.VirtualMachineScaleSetsClient.UpdateInstances(ss.ResourceGroup, vmSetName, vmInstanceIDs, nil) + updateResp := <-updateRespChan + err = <-errChan + glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName) + if ss.CloudProviderBackoff && shouldRetryAPIRequest(updateResp.Response, err) { + glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", vmSetName, err) + retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", vmSetName) + } + } + if err != nil { + return err + } + + return nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_vmsets.go b/pkg/cloudprovider/providers/azure/azure_vmsets.go new file mode 100644 index 00000000000..dd5cc308dcd --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_vmsets.go @@ -0,0 +1,59 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "github.com/Azure/azure-sdk-for-go/arm/network" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/cloudprovider" +) + +// VMSet defines functions all vmsets (including scale set and availabitlity +// set) should be implemented. +type VMSet interface { + // GetInstanceIDByNodeName gets the cloud provider ID by node name. + // It must return ("", cloudprovider.InstanceNotFound) if the instance does + // not exist or is no longer running. + GetInstanceIDByNodeName(name string) (string, error) + // GetInstanceTypeByNodeName gets the instance type by node name. + GetInstanceTypeByNodeName(name string) (string, error) + // GetIPByNodeName gets machine IP by node name. + GetIPByNodeName(name, vmSetName string) (string, error) + // GetPrimaryInterface gets machine primary network interface by node name and vmSet. + GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) + // GetNodeNameByProviderID gets the node name by provider ID. + GetNodeNameByProviderID(providerID string) (types.NodeName, error) + + // GetZoneByNodeName gets cloudprovider.Zone by node name. + GetZoneByNodeName(name string) (cloudprovider.Zone, error) + + // GetPrimaryVMSetName returns the VM set name depending on the configured vmType. + // It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType. + GetPrimaryVMSetName() string + // GetVMSetNames selects all possible availability sets or scale sets + // (depending vmType configured) for service load balancer, if the service has + // no loadbalancer mode annotaion returns the primary VMSet. If service annotation + // for loadbalancer exists then return the eligible VMSet. + GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) + // EnsureHostsInPool ensures the given Node's primary IP configurations are + // participating in the specified LoadBalancer Backend Pool. + EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error + // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. + EnsureBackendPoolDeleted(poolID, vmSetName string) error +} diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index 52d033b9294..f1aa0def597 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -17,19 +17,14 @@ limitations under the License. package azure import ( - "errors" "net/http" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" "github.com/Azure/go-autorest/autorest" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/types" -) -var ( - // ErrorNotVmssInstance indicates an instance is not belongint to any vmss. - ErrorNotVmssInstance = errors.New("not a vmss instance") + "k8s.io/apimachinery/pkg/types" ) // checkExistsFromError inspects an error and returns a true if err is nil, @@ -80,32 +75,6 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName) (vm compute.VirtualM return vm, exists, err } -func (az *Cloud) getVmssVirtualMachine(nodeName types.NodeName) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) { - var realErr error - - vmName := string(nodeName) - instanceID, err := getVmssInstanceID(vmName) - if err != nil { - return vm, false, err - } - - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", vmName) - vm, err = az.VirtualMachineScaleSetVMsClient.Get(az.ResourceGroup, az.PrimaryScaleSetName, instanceID) - glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", vmName) - - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return vm, false, realErr - } - - if !exists { - return vm, false, nil - } - - return vm, exists, err -} - func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { var realErr error diff --git a/pkg/cloudprovider/providers/azure/azure_zones.go b/pkg/cloudprovider/providers/azure/azure_zones.go index 192456f43a8..75d0c412515 100644 --- a/pkg/cloudprovider/providers/azure/azure_zones.go +++ b/pkg/cloudprovider/providers/azure/azure_zones.go @@ -21,13 +21,10 @@ import ( "io" "io/ioutil" "net/http" - "strconv" "sync" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider" - - "github.com/Azure/azure-sdk-for-go/arm/compute" ) const instanceInfoURL = "http://169.254.169.254/metadata/v1/InstanceInfo" @@ -63,10 +60,11 @@ func (az *Cloud) GetZone() (cloudprovider.Zone, error) { // This is particularly useful in external cloud providers where the kubelet // does not initialize node data. func (az *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) { - nodeName, err := splitProviderID(providerID) + nodeName, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return cloudprovider.Zone{}, err } + return az.GetZoneByNodeName(nodeName) } @@ -74,20 +72,7 @@ func (az *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, err // This is particularly useful in external cloud providers where the kubelet // does not initialize node data. func (az *Cloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) { - - vm, err := az.VirtualMachinesClient.Get(az.ResourceGroup, string(nodeName), compute.InstanceView) - - if err != nil { - return cloudprovider.Zone{}, err - } - - failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain)) - - zone := cloudprovider.Zone{ - FailureDomain: failureDomain, - Region: *(vm.Location), - } - return zone, nil + return az.vmSet.GetZoneByNodeName(string(nodeName)) } func fetchFaultDomain() (*string, error) {