Add standard LB support to Azure vmss

This commit is contained in:
Pengfei Ni 2018-04-17 13:08:11 +08:00
parent f1d7156c88
commit 87776025af
3 changed files with 162 additions and 51 deletions

View File

@ -1244,7 +1244,7 @@ func (f *fakeVMSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
return fmt.Errorf("unimplemented") return fmt.Errorf("unimplemented")
} }
func (f *fakeVMSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error { func (f *fakeVMSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
return fmt.Errorf("unimplemented") return fmt.Errorf("unimplemented")
} }

View File

@ -56,7 +56,7 @@ type VMSet interface {
// participating in the specified LoadBalancer Backend Pool. // participating in the specified LoadBalancer Backend Pool.
EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
EnsureBackendPoolDeleted(poolID, vmSetName string) error EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun. // AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error

View File

@ -31,6 +31,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
) )
@ -435,7 +436,8 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Int
} }
// Check scale set name. // Check scale set name.
if vmSetName != "" && !strings.EqualFold(ssName, vmSetName) { // Backends of Standard load balancer could belong to multiple VMSS, so we don't check vmSet for it.
if vmSetName != "" && !strings.EqualFold(ssName, vmSetName) && !ss.useStandardLoadBalancer() {
return network.Interface{}, errNotInVMSet return network.Interface{}, errNotInVMSet
} }
@ -545,9 +547,39 @@ func (ss *scaleSet) updateVMSSInstancesWithRetry(scaleSetName string, vmInstance
}) })
} }
// EnsureHostsInPool ensures the given Node's primary IP configurations are // getNodesScaleSets returns scalesets with instanceIDs and standard node names for given nodes.
// participating in the specified LoadBalancer Backend Pool. func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, []*v1.Node, error) {
func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error { scalesets := make(map[string]sets.String)
standardNodes := []*v1.Node{}
for _, curNode := range nodes {
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
if err != nil {
glog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
standardNodes = append(standardNodes, curNode)
continue
}
if _, ok := scalesets[curScaleSetName]; !ok {
scalesets[curScaleSetName] = sets.NewString()
}
instanceID, err := getLastSegment(curNode.Spec.ProviderID)
if err != nil {
glog.Errorf("Failed to get instance ID for node %q: %v", curNode.Spec.ProviderID, err)
return nil, nil, err
}
scalesets[curScaleSetName].Insert(instanceID)
}
return scalesets, standardNodes, nil
}
// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are
// participating in the vmSet's LoadBalancer Backend Pool.
func (ss *scaleSet) ensureHostsInVMSetPool(serviceName string, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
glog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName) virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName)
if err != nil { if err != nil {
glog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err) glog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err)
@ -585,6 +617,22 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
} }
} }
if !foundPool { if !foundPool {
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
// Although standard load balancer supports backends from multiple vmss,
// the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
backendPool := *newBackendPools[0].ID
matches := backendPoolIDRE.FindStringSubmatch(backendPool)
if len(matches) == 2 {
lbName := matches[1]
if strings.HasSuffix(lbName, InternalLoadBalancerNameSuffix) == isInternal {
glog.V(4).Infof("vmss %q has already been added to LB %q, omit adding it to a new one", vmSetName, lbName)
return nil
}
}
}
newBackendPools = append(newBackendPools, newBackendPools = append(newBackendPools,
computepreview.SubResource{ computepreview.SubResource{
ID: to.StringPtr(backendPoolID), ID: to.StringPtr(backendPoolID),
@ -609,28 +657,6 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
} }
} }
// Construct instanceIDs from nodes.
instanceIDs := []string{}
for _, curNode := range nodes {
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
if err != nil {
glog.V(4).Infof("Node %q is not belonging to any scale sets, omitting it", curNode.Name)
continue
}
if curScaleSetName != vmSetName {
glog.V(4).Infof("Node %q is not belonging to scale set %q, omitting it", curNode.Name, vmSetName)
continue
}
instanceID, err := getLastSegment(curNode.Spec.ProviderID)
if err != nil {
glog.Errorf("Failed to get last segment from %q: %v", curNode.Spec.ProviderID, err)
return err
}
instanceIDs = append(instanceIDs, instanceID)
}
// Update instances to latest VMSS model. // Update instances to latest VMSS model.
vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{ vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs, InstanceIds: &instanceIDs,
@ -654,27 +680,66 @@ func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, back
return nil return nil
} }
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet. // EnsureHostsInPool ensures the given Node's primary IP configurations are
func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error { // participating in the specified LoadBalancer Backend Pool.
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(vmSetName) func (ss *scaleSet) EnsureHostsInPool(serviceName string, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
scalesets, standardNodes, err := ss.getNodesScaleSets(nodes)
if err != nil { if err != nil {
glog.Errorf("ss.EnsureBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, vmSetName, vmSetName, err) glog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err)
return err
}
for ssName, instanceIDs := range scalesets {
// Only add nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && ssName != vmSetName {
continue
}
if instanceIDs.Len() == 0 {
// This may happen when scaling a vmss capacity to 0.
return fmt.Errorf("no nodes running are belonging to vmss %q", ssName)
}
err := ss.ensureHostsInVMSetPool(serviceName, backendPoolID, ssName, instanceIDs.List(), isInternal)
if err != nil {
glog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err)
return err
}
}
if ss.useStandardLoadBalancer() && len(standardNodes) > 0 {
err := ss.availabilitySet.EnsureHostsInPool(serviceName, standardNodes, backendPoolID, "", isInternal)
if err != nil {
glog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err)
return err
}
}
return nil
}
// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset.
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(poolID, ssName string) error {
glog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(ssName)
if err != nil {
glog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, ssName, ssName, err)
return err return err
} }
if !exists { if !exists {
glog.V(2).Infof("ss.EnsureBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, vmSetName, vmSetName) glog.V(2).Infof("ss.ensureScaleSetBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, ssName, ssName)
return nil return nil
} }
// Find primary network interface configuration. // Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName) primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, ssName)
if err != nil { if err != nil {
return err return err
} }
// Find primary IP configuration. // Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName) primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, ssName)
if err != nil { if err != nil {
return err return err
} }
@ -689,7 +754,7 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
for i := len(existingBackendPools) - 1; i >= 0; i-- { for i := len(existingBackendPools) - 1; i >= 0; i-- {
curPool := existingBackendPools[i] curPool := existingBackendPools[i]
if strings.EqualFold(poolID, *curPool.ID) { if strings.EqualFold(poolID, *curPool.ID) {
glog.V(10).Infof("EnsureBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, vmSetName) glog.V(10).Infof("ensureScaleSetBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, ssName)
foundPool = true foundPool = true
newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...) newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...)
} }
@ -701,17 +766,17 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// Update scale set with backoff. // Update scale set with backoff.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", vmSetName) glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", ssName)
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet) resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, ssName, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName) glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err) glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet) retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
if retryErr != nil { if retryErr != nil {
err = retryErr err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", vmSetName) glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
} }
} }
if err != nil { if err != nil {
@ -723,14 +788,16 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{ vmInstanceIDs := computepreview.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs, InstanceIds: &instanceIDs,
} }
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, vmSetName, vmInstanceIDs) instanceCtx, instanceCancel := getContextWithCancel()
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", vmSetName) defer instanceCancel()
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(instanceCtx, ss.ResourceGroup, ssName, vmInstanceIDs)
glog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) { if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", vmSetName, err) glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.updateVMSSInstancesWithRetry(vmSetName, vmInstanceIDs) retryErr := ss.updateVMSSInstancesWithRetry(ssName, vmInstanceIDs)
if retryErr != nil { if retryErr != nil {
err = retryErr err = retryErr
glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", vmSetName) glog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", ssName)
} }
} }
if err != nil { if err != nil {
@ -740,14 +807,16 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
// Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB. // Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB.
// TODO: remove this workaround when figuring out the root cause. // TODO: remove this workaround when figuring out the root cause.
if len(newBackendPools) == 0 { if len(newBackendPools) == 0 {
glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating second time", vmSetName) updateCtx, updateCancel := getContextWithCancel()
resp, err = ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet) defer updateCancel()
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName) glog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating second time", ssName)
resp, err = ss.VirtualMachineScaleSetsClient.CreateOrUpdate(updateCtx, ss.ResourceGroup, ssName, virtualMachineScaleSet)
glog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) { if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", vmSetName, err) glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet) retryErr := ss.createOrUpdateVMSSWithRetry(virtualMachineScaleSet)
if retryErr != nil { if retryErr != nil {
glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", vmSetName) glog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
} }
} }
} }
@ -755,6 +824,48 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string) error {
return nil return nil
} }
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
if backendAddressPools == nil {
return nil
}
scalesets := sets.NewString()
for _, backendPool := range *backendAddressPools {
if strings.EqualFold(*backendPool.ID, poolID) && backendPool.BackendIPConfigurations != nil {
for _, ipConfigurations := range *backendPool.BackendIPConfigurations {
if ipConfigurations.ID == nil {
continue
}
ssName, err := extractScaleSetNameByProviderID(*ipConfigurations.ID)
if err != nil {
glog.V(4).Infof("backend IP configuration %q is not belonging to any vmss, omit it")
continue
}
scalesets.Insert(ssName)
}
break
}
}
for ssName := range scalesets {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && ssName != vmSetName {
continue
}
err := ss.ensureScaleSetBackendPoolDeleted(poolID, ssName)
if err != nil {
glog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err)
return err
}
}
return nil
}
// getVmssMachineID returns the full identifier of a vmss virtual machine. // getVmssMachineID returns the full identifier of a vmss virtual machine.
func (az *Cloud) getVmssMachineID(scaleSetName, instanceID string) string { func (az *Cloud) getVmssMachineID(scaleSetName, instanceID string) string {
return fmt.Sprintf( return fmt.Sprintf(