Add load balancer implementation of vmSet

This commit is contained in:
Pengfei Ni 2017-12-13 14:20:41 +08:00 committed by Pengfei Ni
parent af5b079ef7
commit 86111df41d

View File

@ -23,23 +23,21 @@ import (
"strings"
"k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
serviceapi "k8s.io/kubernetes/pkg/api/v1/service"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
)
const (
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal"
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal"
// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service
// to specify what subnet it is exposed on
const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the
// Azure load balancer selection based on availability sets
@ -50,14 +48,14 @@ const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/
// is selected which has the miinimum rules associated with it.
// 3. "as1,as2" mode - this is when the laod balancer from the specified availability sets is selected that has the
// miinimum rules associated with it.
const ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
// ServiceAnnotationLoadBalancerAutoModeValue the annotation used on the service to specify the
// Azure load balancer auto selection from the availability sets
const ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
// ServiceAnnotationDNSLabelName annotation speficying the DNS label name for the service.
const ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
// ServiceAnnotationSharedSecurityRule is the annotation used on the service
// to specify that the service should be exposed using an Azure security rule
@ -65,7 +63,8 @@ const ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-labe
// increase in the number of services that can be exposed. This relies on the
// Azure "augmented security rules" feature which at the time of writing is in
// preview and available only in certain regions.
const ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule"
ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule"
)
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
@ -166,15 +165,16 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
return nil
}
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists
// If wantLb is TRUE then -it selects a new load balancer
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists.
// If wantLb is TRUE then -it selects a new load balancer.
// In case the selected load balancer does not exists it returns network.LoadBalancer struct
// with added metadata (such as name, location) and existsLB set to FALSE
// By default - cluster default LB is returned
// with added metadata (such as name, location) and existsLB set to FALSE.
// By default - cluster default LB is returned.
func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) {
isInternal := requiresInternalLoadBalancer(service)
var defaultLB *network.LoadBalancer
defaultLBName := az.getLoadBalancerName(clusterName, az.Config.PrimaryAvailabilitySetName, isInternal)
primaryVMSetName := az.vmSet.GetPrimaryVMSetName()
defaultLBName := az.getLoadBalancerName(clusterName, primaryVMSetName, isInternal)
existingLBs, err := az.ListLBWithRetry()
if err != nil {
@ -234,18 +234,19 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
glog.V(3).Infof("selectLoadBalancer(%s): isInternal(%s) - start", serviceName, isInternal)
availabilitySetNames, err := az.getLoadBalancerAvailabilitySetNames(service, nodes)
vmSetNames, err := az.vmSet.GetVMSetNames(service, nodes)
if err != nil {
glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.getLoadBalancerAvailabilitySetNames failed, err=(%v)", clusterName, serviceName, isInternal, err)
glog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.GetVMSetNames failed, err=(%v)", clusterName, serviceName, isInternal, err)
return nil, false, err
}
glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - availabilitysetsnames %v", clusterName, serviceName, isInternal, *availabilitySetNames)
glog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - vmSetNames %v", clusterName, serviceName, isInternal, *vmSetNames)
mapExistingLBs := map[string]network.LoadBalancer{}
for _, lb := range *existingLBs {
mapExistingLBs[*lb.Name] = lb
}
selectedLBRuleCount := math.MaxInt32
for _, currASName := range *availabilitySetNames {
for _, currASName := range *vmSetNames {
currLBName := az.getLoadBalancerName(clusterName, currASName, isInternal)
lb, exists := mapExistingLBs[currLBName]
if !exists {
@ -272,13 +273,13 @@ func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, exi
}
if selectedLB == nil {
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected availability sets %v", clusterName, serviceName, isInternal, *availabilitySetNames)
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected VM sets %v", clusterName, serviceName, isInternal, *vmSetNames)
glog.Error(err)
return nil, false, err
}
// validate if the selected LB has not exceeded the MaximumLoadBalancerRuleCount
if az.Config.MaximumLoadBalancerRuleCount != 0 && selectedLBRuleCount >= az.Config.MaximumLoadBalancerRuleCount {
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, availabilitysetnames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *availabilitySetNames)
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, vmSetNames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *vmSetNames)
glog.Error(err)
return selectedLB, existsLb, err
}
@ -741,6 +742,11 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
// because an Azure load balancer cannot have an empty FrontendIPConfigurations collection
glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName)
// Remove the LB.
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName)
err := az.DeleteLBWithRetry(lbName)
@ -761,23 +767,10 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
if wantLb && nodes != nil {
// Add the machines to the backend pool if they're not already
availabilitySetName := az.mapLoadBalancerNameToAvailabilitySet(lbName, clusterName)
hostUpdates := make([]func() error, len(nodes))
for i, node := range nodes {
localNodeName := node.Name
f := func() error {
err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID, availabilitySetName)
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
err := az.vmSet.EnsureHostsInPool(serviceName, nodes, lbBackendPoolID, vmSetName)
if err != nil {
return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err)
}
return nil
}
hostUpdates[i] = f
}
errs := utilerrors.AggregateGoroutines(hostUpdates...)
if errs != nil {
return nil, utilerrors.Flatten(errs)
return nil, err
}
}
@ -1246,95 +1239,6 @@ func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) b
return false
}
// This ensures the given VM's Primary NIC's Primary IP Configuration is
// participating in the specified LoadBalancer Backend Pool.
func (az *Cloud) ensureHostInPool(serviceName string, nodeName types.NodeName, backendPoolID string, availabilitySetName string) error {
var machine compute.VirtualMachine
vmName := mapNodeNameToVMName(nodeName)
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachinesClient.Get(%q): start", vmName)
machine, err := az.VirtualMachineClientGetWithRetry(az.ResourceGroup, vmName, "")
if err != nil {
glog.V(2).Infof("ensureHostInPool(%s, %s, %s) abort backoff", serviceName, nodeName, backendPoolID)
return err
}
glog.V(10).Infof("VirtualMachinesClient.Get(%q): end", vmName)
primaryNicID, err := getPrimaryInterfaceID(machine)
if err != nil {
return err
}
nicName, err := getLastSegment(primaryNicID)
if err != nil {
return err
}
// Check availability set
if availabilitySetName != "" {
expectedAvailabilitySetName := az.getAvailabilitySetID(availabilitySetName)
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
glog.V(3).Infof(
"nicupdate(%s): skipping nic (%s) since it is not in the availabilitySet(%s)",
serviceName, nicName, availabilitySetName)
return nil
}
}
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "")
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
if err != nil {
return err
}
var primaryIPConfig *network.InterfaceIPConfiguration
primaryIPConfig, err = getPrimaryIPConfig(nic)
if err != nil {
return err
}
foundPool := false
newBackendPools := []network.BackendAddressPool{}
if primaryIPConfig.LoadBalancerBackendAddressPools != nil {
newBackendPools = *primaryIPConfig.LoadBalancerBackendAddressPools
}
for _, existingPool := range newBackendPools {
if strings.EqualFold(backendPoolID, *existingPool.ID) {
foundPool = true
break
}
}
if !foundPool {
newBackendPools = append(newBackendPools,
network.BackendAddressPool{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfig.LoadBalancerBackendAddressPools = &newBackendPools
glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName)
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): start", *nic.Name)
respChan, errChan := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil)
resp := <-respChan
err := <-errChan
glog.V(10).Infof("InterfacesClient.CreateOrUpdate(%q): end", *nic.Name)
if az.CloudProviderBackoff && shouldRetryAPIRequest(resp.Response, err) {
glog.V(2).Infof("nicupdate(%s) backing off: nic(%s) - updating, err=%v", serviceName, nicName, err)
retryErr := az.CreateOrUpdateInterfaceWithRetry(nic)
if retryErr != nil {
err = retryErr
glog.V(2).Infof("nicupdate(%s) abort backoff: nic(%s) - updating", serviceName, nicName)
}
}
if err != nil {
return err
}
}
return nil
}
// Check if service requires an internal load balancer.
func requiresInternalLoadBalancer(service *v1.Service) bool {
if l, ok := service.Annotations[ServiceAnnotationLoadBalancerInternal]; ok {
@ -1354,28 +1258,28 @@ func subnet(service *v1.Service) *string {
return nil
}
// getServiceLoadBalancerMode parses the mode value
// if the value is __auto__ it returns isAuto = TRUE
// if anything else it returns the unique availability set names after triming spaces
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, availabilitySetNames []string) {
// getServiceLoadBalancerMode parses the mode value.
// if the value is __auto__ it returns isAuto = TRUE.
// if anything else it returns the unique VM set names after triming spaces.
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, vmSetNames []string) {
mode, hasMode := service.Annotations[ServiceAnnotationLoadBalancerMode]
mode = strings.TrimSpace(mode)
isAuto = strings.EqualFold(mode, ServiceAnnotationLoadBalancerAutoModeValue)
if !isAuto {
// Break up list of "AS1,AS2"
availabilitySetParsedList := strings.Split(mode, ",")
vmSetParsedList := strings.Split(mode, ",")
// Trim the availability set names and remove duplicates
// Trim the VM set names and remove duplicates
// e.g. {"AS1"," AS2", "AS3", "AS3"} => {"AS1", "AS2", "AS3"}
availabilitySetNameSet := sets.NewString()
for _, v := range availabilitySetParsedList {
availabilitySetNameSet.Insert(strings.TrimSpace(v))
vmSetNameSet := sets.NewString()
for _, v := range vmSetParsedList {
vmSetNameSet.Insert(strings.TrimSpace(v))
}
availabilitySetNames = availabilitySetNameSet.List()
vmSetNames = vmSetNameSet.List()
}
return hasMode, isAuto, availabilitySetNames
return hasMode, isAuto, vmSetNames
}
func useSharedSecurityRule(service *v1.Service) bool {