Add availability sets implementation of VMSet interface

This commit is contained in:
Pengfei Ni 2017-12-13 14:17:37 +08:00
parent 7944bc3117
commit 906abde733

View File

@ -30,8 +30,10 @@ import (
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
)
@ -54,6 +56,7 @@ const (
nodeLabelRole = "kubernetes.io/role"
)
var errNotInVMSet = errors.New("vm is not in the vmset")
var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`)
// returns the full identifier of a machine
@ -133,115 +136,22 @@ func (az *Cloud) getpublicIPAddressID(pipName string) string {
pipName)
}
// getLoadBalancerAvailabilitySetNames selects all possible availability sets for
// service load balancer, if the service has no loadbalancer mode annotaion returns the
// primary availability set if service annotation for loadbalancer availability set
// exists then return the eligible a availability set
func (az *Cloud) getLoadBalancerAvailabilitySetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) {
hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service)
if !hasMode {
// no mode specified in service annotation default to PrimaryAvailabilitySetName
availabilitySetNames = &[]string{az.Config.PrimaryAvailabilitySetName}
return availabilitySetNames, nil
}
availabilitySetNames, err = az.getAgentPoolAvailabiliySets(nodes)
if err != nil {
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - getAgentPoolAvailabiliySets failed err=(%v)", err)
return nil, err
}
if len(*availabilitySetNames) == 0 {
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes))
return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes))
}
// sort the list to have deterministic selection
sort.Strings(*availabilitySetNames)
if !isAuto {
if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 {
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
}
// validate availability set exists
var found bool
for sasx := range serviceAvailabilitySetNames {
for asx := range *availabilitySetNames {
if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) {
found = true
serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx]
break
}
}
if !found {
glog.Errorf("az.getLoadBalancerAvailabilitySetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx])
return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx])
}
}
availabilitySetNames = &serviceAvailabilitySetNames
}
return availabilitySetNames, nil
}
// lists the virtual machines for for the resource group and then builds
// a list of availability sets that match the nodes available to k8s
func (az *Cloud) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
vms, err := az.VirtualMachineClientListWithRetry()
if err != nil {
glog.Errorf("az.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
return nil, err
}
vmNameToAvailabilitySetID := make(map[string]string, len(vms))
for vmx := range vms {
vm := vms[vmx]
if vm.AvailabilitySet != nil {
vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID
}
}
availabilitySetIDs := sets.NewString()
agentPoolAvailabilitySets = &[]string{}
for nx := range nodes {
nodeName := (*nodes[nx]).Name
if isMasterNode(nodes[nx]) {
continue
}
asID, ok := vmNameToAvailabilitySetID[nodeName]
if !ok {
glog.Errorf("az.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName)
return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName)
}
if availabilitySetIDs.Has(asID) {
// already added in the list
continue
}
asName, err := getLastSegment(asID)
if err != nil {
glog.Errorf("az.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err)
return nil, err
}
// AvailabilitySet ID is currently upper cased in a indeterministic way
// We want to keep it lower case, before the ID get fixed
asName = strings.ToLower(asName)
*agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName)
}
return agentPoolAvailabilitySets, nil
}
func (az *Cloud) mapLoadBalancerNameToAvailabilitySet(lbName string, clusterName string) (availabilitySetName string) {
availabilitySetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix)
func (az *Cloud) mapLoadBalancerNameToVMSet(lbName string, clusterName string) (vmSetName string) {
vmSetName = strings.TrimSuffix(lbName, InternalLoadBalancerNameSuffix)
if strings.EqualFold(clusterName, lbName) {
availabilitySetName = az.Config.PrimaryAvailabilitySetName
vmSetName = az.vmSet.GetPrimaryVMSetName()
}
return availabilitySetName
return vmSetName
}
// For a load balancer, all frontend ip should reference either a subnet or publicIpAddress.
// Thus Azure do not allow mixed type (public and internal) load balancer.
// So we'd have a separate name for internal load balancer.
// This would be the name for Azure LoadBalancer resource.
func (az *Cloud) getLoadBalancerName(clusterName string, availabilitySetName string, isInternal bool) string {
lbNamePrefix := availabilitySetName
if strings.EqualFold(availabilitySetName, az.Config.PrimaryAvailabilitySetName) {
func (az *Cloud) getLoadBalancerName(clusterName string, vmSetName string, isInternal bool) string {
lbNamePrefix := vmSetName
if strings.EqualFold(vmSetName, az.vmSet.GetPrimaryVMSetName()) {
lbNamePrefix = clusterName
}
if isInternal {
@ -402,67 +312,7 @@ outer:
}
func (az *Cloud) getIPForMachine(nodeName types.NodeName) (string, error) {
if az.Config.VMType == vmTypeVMSS {
ip, err := az.getIPForVmssMachine(nodeName)
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
return az.getIPForStandardMachine(nodeName)
}
return ip, err
}
return az.getIPForStandardMachine(nodeName)
}
func (az *Cloud) getIPForStandardMachine(nodeName types.NodeName) (string, error) {
az.operationPollRateLimiter.Accept()
machine, exists, err := az.getVirtualMachine(nodeName)
if !exists {
return "", cloudprovider.InstanceNotFound
}
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), az.getVirtualMachine(%s), err=%v", nodeName, nodeName, err)
return "", err
}
nicID, err := getPrimaryInterfaceID(machine)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
return "", err
}
nicName, err := getLastSegment(nicID)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
return "", err
}
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), az.InterfacesClient.Get(%s, %s, %s), err=%v", nodeName, az.ResourceGroup, nicName, "", err)
return "", err
}
ipConfig, err := getPrimaryIPConfig(nic)
if err != nil {
glog.Errorf("error: az.getIPForMachine(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
return "", err
}
targetIP := *ipConfig.PrivateIPAddress
return targetIP, nil
}
// splitProviderID converts a providerID to a NodeName.
func splitProviderID(providerID string) (types.NodeName, error) {
matches := providerIDRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", errors.New("error splitting providerID")
}
return types.NodeName(matches[1]), nil
return az.vmSet.GetIPByNodeName(string(nodeName), "")
}
var polyTable = crc32.MakeTable(crc32.Koopman)
@ -519,3 +369,333 @@ func ExtractDiskData(diskData interface{}) (provisioningState string, diskState
}
return provisioningState, diskState, nil
}
// availabilitySet implements VMSet interface for Azure availability sets.
type availabilitySet struct {
*Cloud
}
// newStandardSet creates a new availabilitySet.
func newAvailabilitySet(az *Cloud) VMSet {
return &availabilitySet{
Cloud: az,
}
}
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running.
func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) {
var machine compute.VirtualMachine
var exists bool
var err error
as.operationPollRateLimiter.Accept()
machine, exists, err = as.getVirtualMachine(types.NodeName(name))
if err != nil {
if as.CloudProviderBackoff {
glog.V(2).Infof("InstanceID(%s) backing off", name)
machine, exists, err = as.GetVirtualMachineWithRetry(types.NodeName(name))
if err != nil {
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
return "", err
}
} else {
return "", err
}
} else if !exists {
return "", cloudprovider.InstanceNotFound
}
return *machine.ID, nil
}
// GetNodeNameByProviderID gets the node name by provider ID.
func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
// NodeName is part of providerID for standard instances.
matches := providerIDRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", errors.New("error splitting providerID")
}
return types.NodeName(matches[1]), nil
}
// GetInstanceTypeByNodeName gets the instance type by node name.
func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) {
machine, exists, err := as.getVirtualMachine(types.NodeName(name))
if err != nil {
glog.Errorf("error: as.GetInstanceTypeByNodeName(%s), as.getVirtualMachine(%s) err=%v", name, name, err)
return "", err
} else if !exists {
return "", cloudprovider.InstanceNotFound
}
return string(machine.HardwareProfile.VMSize), nil
}
// GetZoneByNodeName gets zone from instance view.
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
vm, err := as.VirtualMachinesClient.Get(as.ResourceGroup, name, compute.InstanceView)
if err != nil {
return cloudprovider.Zone{}, err
}
failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
zone := cloudprovider.Zone{
FailureDomain: failureDomain,
Region: *(vm.Location),
}
return zone, nil
}
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
func (as *availabilitySet) GetPrimaryVMSetName() string {
return as.Config.PrimaryAvailabilitySetName
}
// GetIPByNodeName gets machine IP by node name.
func (as *availabilitySet) GetIPByNodeName(name, vmSetName string) (string, error) {
nic, err := as.GetPrimaryInterface(name, vmSetName)
if err != nil {
return "", err
}
ipConfig, err := getPrimaryIPConfig(nic)
if err != nil {
glog.Errorf("error: as.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", name, nic, err)
return "", err
}
targetIP := *ipConfig.PrivateIPAddress
return targetIP, nil
}
// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds
// a list of availability sets that match the nodes available to k8s.
func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
vms, err := as.VirtualMachineClientListWithRetry()
if err != nil {
glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
return nil, err
}
vmNameToAvailabilitySetID := make(map[string]string, len(vms))
for vmx := range vms {
vm := vms[vmx]
if vm.AvailabilitySet != nil {
vmNameToAvailabilitySetID[*vm.Name] = *vm.AvailabilitySet.ID
}
}
availabilitySetIDs := sets.NewString()
agentPoolAvailabilitySets = &[]string{}
for nx := range nodes {
nodeName := (*nodes[nx]).Name
if isMasterNode(nodes[nx]) {
continue
}
asID, ok := vmNameToAvailabilitySetID[nodeName]
if !ok {
glog.Errorf("as.getNodeAvailabilitySet - Node(%s) has no availability sets", nodeName)
return nil, fmt.Errorf("Node (%s) - has no availability sets", nodeName)
}
if availabilitySetIDs.Has(asID) {
// already added in the list
continue
}
asName, err := getLastSegment(asID)
if err != nil {
glog.Errorf("as.getNodeAvailabilitySet - Node (%s)- getLastSegment(%s), err=%v", nodeName, asID, err)
return nil, err
}
// AvailabilitySet ID is currently upper cased in a indeterministic way
// We want to keep it lower case, before the ID get fixed
asName = strings.ToLower(asName)
*agentPoolAvailabilitySets = append(*agentPoolAvailabilitySets, asName)
}
return agentPoolAvailabilitySets, nil
}
// GetVMSetNames selects all possible availability sets or scale sets
// (depending vmType configured) for service load balancer, if the service has
// no loadbalancer mode annotaion returns the primary VMSet. If service annotation
// for loadbalancer exists then return the eligible VMSet.
func (as *availabilitySet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (availabilitySetNames *[]string, err error) {
hasMode, isAuto, serviceAvailabilitySetNames := getServiceLoadBalancerMode(service)
if !hasMode {
// no mode specified in service annotation default to PrimaryAvailabilitySetName
availabilitySetNames = &[]string{as.Config.PrimaryAvailabilitySetName}
return availabilitySetNames, nil
}
availabilitySetNames, err = as.getAgentPoolAvailabiliySets(nodes)
if err != nil {
glog.Errorf("as.GetVMSetNames - getAgentPoolAvailabiliySets failed err=(%v)", err)
return nil, err
}
if len(*availabilitySetNames) == 0 {
glog.Errorf("as.GetVMSetNames - No availability sets found for nodes in the cluster, node count(%d)", len(nodes))
return nil, fmt.Errorf("No availability sets found for nodes, node count(%d)", len(nodes))
}
// sort the list to have deterministic selection
sort.Strings(*availabilitySetNames)
if !isAuto {
if serviceAvailabilitySetNames == nil || len(serviceAvailabilitySetNames) == 0 {
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
}
// validate availability set exists
var found bool
for sasx := range serviceAvailabilitySetNames {
for asx := range *availabilitySetNames {
if strings.EqualFold((*availabilitySetNames)[asx], serviceAvailabilitySetNames[sasx]) {
found = true
serviceAvailabilitySetNames[sasx] = (*availabilitySetNames)[asx]
break
}
}
if !found {
glog.Errorf("as.GetVMSetNames - Availability set (%s) in service annotation not found", serviceAvailabilitySetNames[sasx])
return nil, fmt.Errorf("availability set (%s) - not found", serviceAvailabilitySetNames[sasx])
}
}
availabilitySetNames = &serviceAvailabilitySetNames
}
return availabilitySetNames, nil
}
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (as *availabilitySet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
var machine compute.VirtualMachine
as.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", nodeName)
machine, err := as.VirtualMachineClientGetWithRetry(as.ResourceGroup, nodeName, "")
if err != nil {
glog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName)
return network.Interface{}, err
}
glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", nodeName)
primaryNicID, err := getPrimaryInterfaceID(machine)
if err != nil {
return network.Interface{}, err
}
nicName, err := getLastSegment(primaryNicID)
if err != nil {
return network.Interface{}, err
}
// Check availability set
if vmSetName != "" {
expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName)
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
glog.V(3).Infof(
"GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName)
return network.Interface{}, errNotInVMSet
}
}
as.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
nic, err := as.InterfacesClient.Get(as.ResourceGroup, nicName, "")
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
if err != nil {
return network.Interface{}, err
}
return nic, nil
}
// ensureHostInPool ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, vmSetName string) error {
vmName := mapNodeNameToVMName(nodeName)
nic, err := as.GetPrimaryInterface(vmName, vmSetName)
if err != nil {
if err == errNotInVMSet {
glog.V(3).Infof("ensureHostInPool skips node %s because it is not in the vmSet %s", nodeName, vmSetName)
return nil
}
glog.Errorf("error: az.ensureHostInPool(%s), az.vmSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err)
return err
}
var primaryIPConfig *network.InterfaceIPConfiguration
primaryIPConfig, err = getPrimaryIPConfig(nic)
if err != nil {
return err
}
foundPool := false
newBackendPools := []network.BackendAddressPool{}
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools
}
for _, existingPool := range newBackendPools {
if strings.EqualFold(backendPoolID, *existingPool.ID) {
foundPool = true
break
}
}
if !foundPool {
newBackendPools = append(newBackendPools,
network.BackendAddressPool{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
nicName := *nic.Name
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
as.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name)
respChan, errChan := as.InterfacesClient.CreateOrUpdate(as.ResourceGroup, *nic.Name, nic, nil)
resp := <-respChan
err := <-errChan
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name)
if as.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
retryErr := as.CreateOrUpdateInterfaceWithRetry(nic)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
}
}
if err != nil {
return err
}
}
return nil
}
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string) error {
hostUpdates := make([]func() error, len(nodes))
for i, node := range nodes {
localNodeName := node.Name
f := func() error {
err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName)
if err != nil {
return fmt.Errorf("ensure(%s): backendPoolID(%s) - failed to ensure host in pool: %q", serviceName, backendPoolID, err)
}
return nil
}
hostUpdates[i] = f
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
if errs != nil {
return utilerrors.Flatten(errs)
}
return nil
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (as *availabilitySet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// Do nothing for availability set.
return nil
}