Support cross resource group load balancer.

This commit is contained in:
t-qini 2019-08-06 22:59:04 +08:00
parent 7f1ae0e32d
commit fcd0c6706f
3 changed files with 144 additions and 11 deletions

View File

@ -25,7 +25,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-08-01/network"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
@ -232,7 +232,8 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb, to.String(lb.Etag))
rgName := az.getLoadBalancerResourceGroup(service)
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
if err == nil {
if isSuccessHTTPResponse(resp) {
@ -259,7 +260,8 @@ func (az *Cloud) createOrUpdateLBWithRetry(service *v1.Service, lb network.LoadB
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb, to.String(lb.Etag))
rgName := az.getLoadBalancerResourceGroup(service)
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateLoadBalancer", resp, err)
if done && err == nil {
@ -282,7 +284,8 @@ func (az *Cloud) ListLB(service *v1.Service) ([]network.LoadBalancer, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
allLBs, err := az.LoadBalancerClient.List(ctx, az.ResourceGroup)
rgName := az.getLoadBalancerResourceGroup(service)
allLBs, err := az.LoadBalancerClient.List(ctx, rgName)
if err != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", err.Error())
klog.Errorf("LoadBalancerClient.List(%v) failure with err=%v", az.ResourceGroup, err)
@ -304,7 +307,8 @@ func (az *Cloud) listLBWithRetry(service *v1.Service) ([]network.LoadBalancer, e
ctx, cancel := getContextWithCancel()
defer cancel()
allLBs, retryErr = az.LoadBalancerClient.List(ctx, az.ResourceGroup)
rgName := az.getLoadBalancerResourceGroup(service)
allLBs, retryErr = az.LoadBalancerClient.List(ctx, rgName)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", retryErr.Error())
klog.Errorf("LoadBalancerClient.List(%v) - backoff: failure, will retry,err=%v",

View File

@ -24,7 +24,7 @@ import (
"strconv"
"strings"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
servicehelpers "k8s.io/cloud-provider/service/helpers"
@ -93,6 +93,8 @@ const (
serviceTagKey = "service"
// clusterNameKey is the cluster name key applied for public IP tags.
clusterNameKey = "kubernetes-cluster-name"
// LoadBalancerName
LoadBalancerName = "load-balancer-name"
)
// GetLoadBalancer returns whether the specified load balancer exists, and
@ -219,10 +221,42 @@ func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri
// GetLoadBalancerName returns the LoadBalancer name.
func (az *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
// TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
if lbName, found := service.Annotations[LoadBalancerName]; found && strings.TrimSpace(lbName) != "" {
return lbName
}
return cloudprovider.DefaultLoadBalancerName(service)
}
func (az *Cloud) getLoadBalancerResourceGroup(service *v1.Service) string {
if rgName, found := service.Annotations[ServiceAnnotationLoadBalancerResourceGroup]; found && strings.TrimSpace(rgName) != "" {
return rgName
}
return az.ResourceGroup
}
// getLoadBalancerByName get the specific lb with given load balancer name and resource group.
// It works with annotation "service.beta.kubernetes.io/azure-load-balancer-resource-group"
func (az *Cloud) getLoadBalancerByName(service *v1.Service, lbName string) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) {
existingLBs, err := az.ListLB(service)
if err != nil {
return nil, nil, false, err
}
if existingLBs == nil {
return nil, nil, false, fmt.Errorf("cannot obtain the load balancer %q", lbName)
}
for _, existingLB := range existingLBs {
if strings.EqualFold(*existingLB.Name, lbName) {
status, err = az.getServiceLoadBalancerStatus(service, &existingLB)
if err != nil {
return nil, nil, false, err
}
return &existingLB, status, true, err
}
}
return nil, nil, false, fmt.Errorf("cannot obtain the load balancer %q", lbName)
}
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists.
// If wantLb is TRUE then -it selects a new load balancer.
// In case the selected load balancer does not exist it returns network.LoadBalancer struct
@ -595,14 +629,32 @@ func (az *Cloud) isFrontendIPChanged(clusterName string, config network.Frontend
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
// nodes only used if wantLb is true
func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node, wantLb bool) (*network.LoadBalancer, error) {
var (
lb *network.LoadBalancer
exists bool
err error
rgName string
)
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
klog.V(2).Infof("reconcileLoadBalancer for service(%s) - wantLb(%t): started", serviceName, wantLb)
lb, _, _, err := az.getServiceLoadBalancer(service, clusterName, nodes, wantLb)
if err != nil {
klog.Errorf("reconcileLoadBalancer: failed to get load balancer for service %q, error: %v", serviceName, err)
return nil, err
if rgName = az.getLoadBalancerResourceGroup(service); rgName != az.ResourceGroup {
lbName := az.GetLoadBalancerName(context.Background(), clusterName, service)
lb, _, exists, err = az.getLoadBalancerByName(service, lbName)
if err != nil {
klog.Errorf("reconcileLoadBalancer: failed to get load balancer for service %q in resource group %q, error: %v, will continue in the default resource group", serviceName, rgName, err)
}
}
if !exists || strings.TrimSpace(rgName) == "" || strings.TrimSpace(rgName) == az.ResourceGroup {
lb, _, _, err = az.getServiceLoadBalancer(service, clusterName, nodes, wantLb)
if err != nil {
klog.Errorf("reconcileLoadBalancer: failed to get load balancer for service %q, error: %v", serviceName, err)
return nil, err
}
}
lbName := *lb.Name
klog.V(2).Infof("reconcileLoadBalancer for service(%s): lb(%s) wantLb(%t) resolved load balancer name", serviceName, lbName, wantLb)
lbFrontendIPConfigName := az.getFrontendIPConfigName(service, subnet(service))

View File

@ -676,6 +676,83 @@ func TestGetServiceLoadBalancer(t *testing.T) {
}
}
func TestGetLoadBalancerByName(t *testing.T) {
testCases := []struct {
desc string
existingLBs []network.LoadBalancer
expectedLB *network.LoadBalancer
expectedStatus *v1.LoadBalancerStatus
expectedExists bool
expectedError bool
}{
{
desc: "getLoadBalancerByName shall return the corresponding load balancer in the resource group different from the cluster's",
existingLBs: []network.LoadBalancer{
{
Name: to.StringPtr("me"),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
FrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
Name: to.StringPtr("atest1"),
FrontendIPConfigurationPropertiesFormat: &network.FrontendIPConfigurationPropertiesFormat{
PublicIPAddress: &network.PublicIPAddress{ID: to.StringPtr("id1")},
},
},
},
},
},
{
Name: to.StringPtr("notMe"),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
FrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
Name: to.StringPtr("atest1"),
FrontendIPConfigurationPropertiesFormat: &network.FrontendIPConfigurationPropertiesFormat{
PublicIPAddress: &network.PublicIPAddress{ID: to.StringPtr("id1")},
},
},
},
},
},
},
expectedLB: &network.LoadBalancer{
Name: to.StringPtr("me"),
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{
FrontendIPConfigurations: &[]network.FrontendIPConfiguration{
{
Name: to.StringPtr("atest1"),
FrontendIPConfigurationPropertiesFormat: &network.FrontendIPConfigurationPropertiesFormat{
PublicIPAddress: &network.PublicIPAddress{ID: to.StringPtr("id1")},
},
},
},
},
},
expectedStatus: &v1.LoadBalancerStatus{Ingress: []v1.LoadBalancerIngress{{IP: "", Hostname: ""}}},
expectedExists: true,
expectedError: false,
},
}
for i, test := range testCases {
az := getTestCloud()
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
service.Annotations[ServiceAnnotationLoadBalancerResourceGroup] = "rg1"
for _, existingLB := range test.existingLBs {
_, err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg1", *existingLB.Name, existingLB, "")
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
}
lb, status, exists, err := az.getLoadBalancerByName(&service, "me")
assert.Equal(t, test.expectedLB, lb, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedStatus, status, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedExists, exists, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
}
}
func TestIsFrontendIPChanged(t *testing.T) {
testCases := []struct {
desc string