From 906abde7337bbd9b88b29d416b219046e4da358b Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 13 Dec 2017 14:17:37 +0800 Subject: [PATCH] Add availability sets implementation of VMSet interface --- .../providers/azure/azure_util.go | 502 ++++++++++++------ 1 file changed, 341 insertions(+), 161 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index 958275ca66b..6181550571b 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -30,8 +30,10 @@ import ( "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" ) @@ -54,6 +56,7 @@ const ( nodeLabelRole = "kubernetes.io/role" ) +var errNotInVMSet = errors.New("vm is not in the vmset") var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`) // returns the full identifier of a machine @@ -133,115 +136,22 @@ func (az *Cloud) getpublicIPAddressID(pipName string) string { pipName) } -// getLoadBalancerAvailabilitySetNames selects all possible availability sets for -// service load balancer, if the service has no loadbalancer mode annotaion returns the -// primary availability set if service annotation for loadbalancer availability set -// exists then return the eligible a availability set -func (az *Cloud) getLoadBalancerAvailabilitySetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) { - hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service) - if !hasMode { - // no mode specified in service annotation default to PrimaryAvailabilitySetName - availabilitySetNames = &[]string{az.Config.PrimaryAvailabilitySetName} - return availabilitySetNames, nil - } - availabilitySetNames, err = az.getAgentPoolAvailabiliySets(nodes) - if err != nil { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - getAgentPoolAvailabiliySets failed err=(%v)", err) - return nil, err - } - if len(*availabilitySetNames) == 0 { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes)) - return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes)) - } - // sort the list to have deterministic selection - sort.Strings(*availabilitySetNames) - if !isAuto { - if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 { - return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value") - } - // validate availability set exists - var found bool - for sasx := range serviceAvailabilitySetNames { - for asx := range *availabilitySetNames { - if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) { - found = true - serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx] - break - } - } - if !found { - glog.Errorf("az.getLoadBalancerAvailabilitySetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx]) - return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx]) - } - } - availabilitySetNames = &serviceAvailabilitySetNames - } - - return availabilitySetNames, nil -} - -// lists the virtual machines for for the resource group and then builds -// a list of availability sets that match the nodes available to k8s -func (az *Cloud) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) { - vms, err := az.VirtualMachineClientListWithRetry() - if err != nil { - glog.Errorf("az.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err) - return nil, err - } - vmNameToAvailabilitySetID := make(map[string]string, len(vms)) - for vmx := range vms { - vm := vms[vmx] - if vm.AvailabilitySet != nil { - vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID - } - } - availabilitySetIDs := sets.NewString() - agentPoolAvailabilitySets = &[]string{} - for nx := range nodes { - nodeName := (*nodes[nx]).Name - if isMasterNode(nodes[nx]) { - continue - } - asID, ok := vmNameToAvailabilitySetID[nodeName] - if !ok { - glog.Errorf("az.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName) - return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName) - } - if availabilitySetIDs.Has(asID) { - // already added in the list - continue - } - asName, err := getLastSegment(asID) - if err != nil { - glog.Errorf("az.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err) - return nil, err - } - // AvailabilitySet ID is currently upper cased in a indeterministic way - // We want to keep it lower case, before the ID get fixed - asName = strings.ToLower(asName) - - *agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName) - } - - return agentPoolAvailabilitySets, nil -} - -func (az *Cloud) mapLoadBalancerNameToAvailabilitySet(lbName string, clusterName string) (availabilitySetName string) { - availabilitySetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix) +func (az *Cloud) mapLoadBalancerNameToVMSet(lbName string, clusterName string) (vmSetName string) { + vmSetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix) if strings.EqualFold(clusterName, lbName) { - availabilitySetName = az.Config.PrimaryAvailabilitySetName + vmSetName = az.vmSet.GetPrimaryVMSetName() } - return availabilitySetName + return vmSetName } // For a load balancer, all frontend ip should reference either a subnet or publicIpAddress. // Thus Azure do not allow mixed type (public and internal) load balancer. // So we'd have a separate name for internal load balancer. // This would be the name for Azure LoadBalancer resource. -func (az *Cloud) getLoadBalancerName(clusterName string, availabilitySetName string, isInternal bool) string { - lbNamePrefix := availabilitySetName - if strings.EqualFold(availabilitySetName, az.Config.PrimaryAvailabilitySetName) { +func (az *Cloud) getLoadBalancerName(clusterName string, vmSetName string, isInternal bool) string { + lbNamePrefix := vmSetName + if strings.EqualFold(vmSetName, az.vmSet.GetPrimaryVMSetName()) { lbNamePrefix = clusterName } if isInternal { @@ -402,67 +312,7 @@ outer: } func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) { - if az.Config.VMType == vmTypeVMSS { - ip, err := az.getIPForVmssMachine(nodeName) - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { - return az.getIPForStandardMachine(nodeName) - } - - return ip, err - } - - return az.getIPForStandardMachine(nodeName) -} - -func (az *Cloud) getIPForStandardMachine(nodeName types.NodeName) (string, error) { - az.operationPollRateLimiter.Accept() - machine, exists, err := az.getVirtualMachine(nodeName) - if !exists { - return "", cloudprovider.InstanceNotFound - } - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err) - return "", err - } - - nicID, err := getPrimaryInterfaceID(machine) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) - return "", err - } - - nicName, err := getLastSegment(nicID) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) - return "", err - } - - az.operationPollRateLimiter.Accept() - glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) - nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") - glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err) - return "", err - } - - ipConfig, err := getPrimaryIPConfig(nic) - if err != nil { - glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err) - return "", err - } - - targetIP := *ipConfig.PrivateIPAddress - return targetIP, nil -} - -// splitProviderID converts a providerID to a NodeName. -func splitProviderID(providerID string) (types.NodeName, error) { - matches := providerIDRE.FindStringSubmatch(providerID) - if len(matches) != 2 { - return "", errors.New("error splitting providerID") - } - return types.NodeName(matches[1]), nil + return az.vmSet.GetIPByNodeName(string(nodeName), "") } var polyTable = crc32.MakeTable(crc32.Koopman) @@ -519,3 +369,333 @@ func ExtractDiskData(diskData interface{}) (provisioningState string, diskState } return provisioningState, diskState, nil } + +// availabilitySet implements VMSet interface for Azure availability sets. +type availabilitySet struct { + *Cloud +} + +// newStandardSet creates a new availabilitySet. +func newAvailabilitySet(az *Cloud) VMSet { + return &availabilitySet{ + Cloud: az, + } +} + +// GetInstanceIDByNodeName gets the cloud provider ID by node name. +// It must return ("", cloudprovider.InstanceNotFound) if the instance does +// not exist or is no longer running. +func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) { + var machine compute.VirtualMachine + var exists bool + var err error + + as.operationPollRateLimiter.Accept() + machine, exists, err = as.getVirtualMachine(types.NodeName(name)) + if err != nil { + if as.CloudProviderBackoff { + glog.V(2).Infof("InstanceID(%s) backing off", name) + machine, exists, err = as.GetVirtualMachineWithRetry(types.NodeName(name)) + if err != nil { + glog.V(2).Infof("InstanceID(%s) abort backoff", name) + return "", err + } + } else { + return "", err + } + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + return *machine.ID, nil +} + +// GetNodeNameByProviderID gets the node name by provider ID. +func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) { + // NodeName is part of providerID for standard instances. + matches := providerIDRE.FindStringSubmatch(providerID) + if len(matches) != 2 { + return "", errors.New("error splitting providerID") + } + + return types.NodeName(matches[1]), nil +} + +// GetInstanceTypeByNodeName gets the instance type by node name. +func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { + machine, exists, err := as.getVirtualMachine(types.NodeName(name)) + if err != nil { + glog.Errorf("error: as.GetInstanceTypeByNodeName(%s), as.getVirtualMachine(%s) err=%v", name, name, err) + return "", err + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + + return string(machine.HardwareProfile.VMSize), nil +} + +// GetZoneByNodeName gets zone from instance view. +func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { + vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView) + if err != nil { + return cloudprovider.Zone{}, err + } + + failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain)) + zone := cloudprovider.Zone{ + FailureDomain: failureDomain, + Region: *(vm.Location), + } + return zone, nil +} + +// GetPrimaryVMSetName returns the VM set name depending on the configured vmType. +// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType. +func (as *availabilitySet) GetPrimaryVMSetName() string { + return as.Config.PrimaryAvailabilitySetName +} + +// GetIPByNodeName gets machine IP by node name. +func (as *availabilitySet) GetIPByNodeName(name, vmSetName string) (string, error) { + nic, err := as.GetPrimaryInterface(name, vmSetName) + if err != nil { + return "", err + } + + ipConfig, err := getPrimaryIPConfig(nic) + if err != nil { + glog.Errorf("error: as.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", name, nic, err) + return "", err + } + + targetIP := *ipConfig.PrivateIPAddress + return targetIP, nil +} + +// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds +// a list of availability sets that match the nodes available to k8s. +func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) { + vms, err := as.VirtualMachineClientListWithRetry() + if err != nil { + glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err) + return nil, err + } + vmNameToAvailabilitySetID := make(map[string]string, len(vms)) + for vmx := range vms { + vm := vms[vmx] + if vm.AvailabilitySet != nil { + vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID + } + } + availabilitySetIDs := sets.NewString() + agentPoolAvailabilitySets = &[]string{} + for nx := range nodes { + nodeName := (*nodes[nx]).Name + if isMasterNode(nodes[nx]) { + continue + } + asID, ok := vmNameToAvailabilitySetID[nodeName] + if !ok { + glog.Errorf("as.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName) + return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName) + } + if availabilitySetIDs.Has(asID) { + // already added in the list + continue + } + asName, err := getLastSegment(asID) + if err != nil { + glog.Errorf("as.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err) + return nil, err + } + // AvailabilitySet ID is currently upper cased in a indeterministic way + // We want to keep it lower case, before the ID get fixed + asName = strings.ToLower(asName) + + *agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName) + } + + return agentPoolAvailabilitySets, nil +} + +// GetVMSetNames selects all possible availability sets or scale sets +// (depending vmType configured) for service load balancer, if the service has +// no loadbalancer mode annotaion returns the primary VMSet. If service annotation +// for loadbalancer exists then return the eligible VMSet. +func (as *availabilitySet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) { + hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service) + if !hasMode { + // no mode specified in service annotation default to PrimaryAvailabilitySetName + availabilitySetNames = &[]string{as.Config.PrimaryAvailabilitySetName} + return availabilitySetNames, nil + } + availabilitySetNames, err = as.getAgentPoolAvailabiliySets(nodes) + if err != nil { + glog.Errorf("as.GetVMSetNames - getAgentPoolAvailabiliySets failed err=(%v)", err) + return nil, err + } + if len(*availabilitySetNames) == 0 { + glog.Errorf("as.GetVMSetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes)) + return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes)) + } + // sort the list to have deterministic selection + sort.Strings(*availabilitySetNames) + if !isAuto { + if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 { + return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value") + } + // validate availability set exists + var found bool + for sasx := range serviceAvailabilitySetNames { + for asx := range *availabilitySetNames { + if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) { + found = true + serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx] + break + } + } + if !found { + glog.Errorf("as.GetVMSetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx]) + return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx]) + } + } + availabilitySetNames = &serviceAvailabilitySetNames + } + + return availabilitySetNames, nil +} + +// GetPrimaryInterface gets machine primary network interface by node name and vmSet. +func (as *availabilitySet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) { + var machine compute.VirtualMachine + + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", nodeName) + machine, err := as.VirtualMachineClientGetWithRetry(as.ResourceGroup, nodeName, "") + if err != nil { + glog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) + return network.Interface{}, err + } + glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", nodeName) + + primaryNicID, err := getPrimaryInterfaceID(machine) + if err != nil { + return network.Interface{}, err + } + nicName, err := getLastSegment(primaryNicID) + if err != nil { + return network.Interface{}, err + } + + // Check availability set + if vmSetName != "" { + expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName) + if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) { + glog.V(3).Infof( + "GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName) + return network.Interface{}, errNotInVMSet + } + } + + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) + nic, err := as.InterfacesClient.Get(as.ResourceGroup, nicName, "") + glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) + if err != nil { + return network.Interface{}, err + } + + return nic, nil +} + +// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is +// participating in the specified LoadBalancer Backend Pool. +func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, vmSetName string) error { + vmName := mapNodeNameToVMName(nodeName) + nic, err := as.GetPrimaryInterface(vmName, vmSetName) + if err != nil { + if err == errNotInVMSet { + glog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName) + return nil + } + + glog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) + return err + } + + var primaryIPConfig *network.InterfaceIPConfiguration + primaryIPConfig, err = getPrimaryIPConfig(nic) + if err != nil { + return err + } + + foundPool := false + newBackendPools := []network.BackendAddressPool{} + if primaryIPConfig.LoadBalancerBackendAddressPools != nil { + newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools + } + for _, existingPool := range newBackendPools { + if strings.EqualFold(backendPoolID, *existingPool.ID) { + foundPool = true + break + } + } + if !foundPool { + newBackendPools = append(newBackendPools, + network.BackendAddressPool{ + ID: to.StringPtr(backendPoolID), + }) + + primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools + + nicName := *nic.Name + glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) + as.operationPollRateLimiter.Accept() + glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name) + respChan, errChan := as.InterfacesClient.CreateOrUpdate(as.ResourceGroup, *nic.Name, nic, nil) + resp := <-respChan + err := <-errChan + glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name) + if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) { + glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err) + retryErr := as.CreateOrUpdateInterfaceWithRetry(nic) + if retryErr != nil { + err = retryErr + glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName) + } + } + if err != nil { + return err + } + } + return nil +} + +// EnsureHostsInPool ensures the given Node's primary IP configurations are +// participating in the specified LoadBalancer Backend Pool. +func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error { + hostUpdates := make([]func() error, len(nodes)) + for i, node := range nodes { + localNodeName := node.Name + f := func() error { + err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName) + if err != nil { + return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", serviceName, backendPoolID, err) + } + return nil + } + hostUpdates[i] = f + } + + errs := utilerrors.AggregateGoroutines(hostUpdates...) + if errs != nil { + return utilerrors.Flatten(errs) + } + + return nil +} + +// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. +func (as *availabilitySet) EnsureBackendPoolDeleted(poolID, vmSetName string) error { + // Do nothing for availability set. + return nil +}