diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index bd3fcf28ac2..5987fb409e9 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -35,6 +35,10 @@ import ( // ServiceAnnotationLoadBalancerInternal is the annotation used on the service const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal" +// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service +// to specify what subnet it is exposed on +const ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet" + // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. func (az *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) { @@ -54,7 +58,7 @@ func (az *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (statu var lbIP *string if isInternal { - lbFrontendIPConfigName := getFrontendIPConfigName(service) + lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service)) for _, ipConfiguration := range *lb.FrontendIPConfigurations { if lbFrontendIPConfigName == *ipConfiguration.Name { lbIP = ipConfiguration.PrivateIPAddress @@ -182,7 +186,11 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod var fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat if isInternal { - subnet, existsSubnet, err := az.getSubnet(az.VnetName, az.SubnetName) + subnetName := subnet(service) + if subnetName == nil { + subnetName = &az.SubnetName + } + subnet, existsSubnet, err := az.getSubnet(az.VnetName, *subnetName) if err != nil { return nil, err } @@ -366,7 +374,7 @@ func (az *Cloud) cleanupLoadBalancer(clusterName string, service *v1.Service, is if !isInternalLb { // Find public ip resource to clean up from IP configuration - lbFrontendIPConfigName := getFrontendIPConfigName(service) + lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service)) for _, config := range *lb.FrontendIPConfigurations { if strings.EqualFold(*config.Name, lbFrontendIPConfigName) { if config.PublicIPAddress != nil { @@ -523,7 +531,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration isInternal := requiresInternalLoadBalancer(service) lbName := getLoadBalancerName(clusterName, isInternal) serviceName := getServiceName(service) - lbFrontendIPConfigName := getFrontendIPConfigName(service) + lbFrontendIPConfigName := getFrontendIPConfigName(service, subnet(service)) lbFrontendIPConfigID := az.getFrontendIPConfigID(lbName, lbFrontendIPConfigName) lbBackendPoolName := getBackendPoolName(clusterName) lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendPoolName) @@ -575,6 +583,14 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration } } } else { + for i := len(newConfigs) - 1; i >= 0; i-- { + config := newConfigs[i] + if serviceOwnsFrontEndIP(config, service) && !strings.EqualFold(*config.Name, lbFrontendIPConfigName) { + glog.V(3).Infof("reconcile(%s)(%t): lb frontendconfig(%s) - dropping", serviceName, wantLb, *config.Name) + newConfigs = append(newConfigs[:i], newConfigs[i+1:]...) + dirtyConfigs = true + } + } foundConfig := false for _, config := range newConfigs { if strings.EqualFold(*config.Name, lbFrontendIPConfigName) { @@ -608,7 +624,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration var expectedProbes []network.Probe var expectedRules []network.LoadBalancingRule for _, port := range ports { - lbRuleName := getLoadBalancerRuleName(service, port) + lbRuleName := getLoadBalancerRuleName(service, port, subnet(service)) transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol) if err != nil { @@ -652,6 +668,7 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP { loadDistribution = network.SourceIP } + expectedRule := network.LoadBalancingRule{ Name: &lbRuleName, LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{ @@ -797,7 +814,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st } for j := range sourceAddressPrefixes { ix := i*len(sourceAddressPrefixes) + j - securityRuleName := getSecurityRuleName(service, port, sourceAddressPrefixes[j]) + securityRuleName := getSecurityRuleName(service, port, subnet(service), sourceAddressPrefixes[j]) expectedSecurityRules[ix] = network.SecurityRule{ Name: to.StringPtr(securityRuleName), SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{ @@ -994,3 +1011,10 @@ func requiresInternalLoadBalancer(service *v1.Service) bool { return false } + +func subnet(service *v1.Service) *string { + if l, ok := service.Annotations[ServiceAnnotationLoadBalancerInternalSubnet]; ok { + return &l + } + return nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index 32ea4c68e07..e8e1525a502 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -67,6 +67,104 @@ func TestReconcileLoadBalancerAddPort(t *testing.T) { validateLoadBalancer(t, lb, svc) } +// Test additional of a new service/port on an internal LB with a subnet. +func TestReconcileLoadBalancerAddPortOnInternalSubnet(t *testing.T) { + az := getTestCloud() + svc := getTestService("servicea", v1.ProtocolTCP, 80) + addTestSubnet(&svc) + configProperties := getTestPublicFipConfigurationProperties() + lb := getTestLoadBalancer() + nodes := []*v1.Node{} + + svc.Spec.Ports = append(svc.Spec.Ports, v1.ServicePort{ + Name: fmt.Sprintf("port-udp-%d", 1234), + Protocol: v1.ProtocolUDP, + Port: 1234, + NodePort: getBackendPort(1234), + }) + + lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + if !updated { + t.Error("Expected the loadbalancer to need an update") + } + + // ensure we got a frontend ip configuration + if len(*lb.FrontendIPConfigurations) != 1 { + t.Error("Expected the loadbalancer to have a frontend ip configuration") + } + + validateLoadBalancer(t, lb, svc) +} + +// Test addition of services on an internal LB using both default and explicit subnets. +func TestReconcileLoadBalancerAddServicesOnMultipleSubnets(t *testing.T) { + az := getTestCloud() + svc1 := getTestService("service1", v1.ProtocolTCP, 8081) + svc2 := getTestService("service2", v1.ProtocolTCP, 8081) + addTestSubnet(&svc2) + configProperties := getTestPublicFipConfigurationProperties() + lb := getTestLoadBalancer() + nodes := []*v1.Node{} + + lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc1, nodes) + if err != nil { + t.Errorf("Unexpected error reconciling svc1: %q", err) + } + + lb, updated, err = az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc2, nodes) + if err != nil { + t.Errorf("Unexpected error reconciling svc2: %q", err) + } + + if !updated { + t.Error("Expected the loadbalancer to need an update") + } + + // ensure we got a frontend ip configuration for each service + if len(*lb.FrontendIPConfigurations) != 2 { + t.Error("Expected the loadbalancer to have 2 frontend ip configurations") + } + + validateLoadBalancer(t, lb, svc1, svc2) +} + +// Test moving a service exposure from one subnet to another. +func TestReconcileLoadBalancerEditServiceSubnet(t *testing.T) { + az := getTestCloud() + svc := getTestService("service1", v1.ProtocolTCP, 8081) + addTestSubnet(&svc) + configProperties := getTestPublicFipConfigurationProperties() + lb := getTestLoadBalancer() + nodes := []*v1.Node{} + + lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes) + if err != nil { + t.Errorf("Unexpected error reconciling initial svc: %q", err) + } + + svc.Annotations[ServiceAnnotationLoadBalancerInternalSubnet] = "NewSubnet" + + lb, updated, err = az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes) + if err != nil { + t.Errorf("Unexpected error reconciling edits to svc: %q", err) + } + + if !updated { + t.Error("Expected the loadbalancer to need an update") + } + + // ensure we got a frontend ip configuration for the service + if len(*lb.FrontendIPConfigurations) != 1 { + t.Error("Expected the loadbalancer to have 1 frontend ip configuration") + } + + validateLoadBalancer(t, lb, svc) +} + func TestReconcileLoadBalancerNodeHealth(t *testing.T) { az := getTestCloud() svc := getTestService("servicea", v1.ProtocolTCP, 80) @@ -337,7 +435,7 @@ func getTestLoadBalancer(services ...v1.Service) network.LoadBalancer { for _, service := range services { for _, port := range service.Spec.Ports { - ruleName := getLoadBalancerRuleName(&service, port) + ruleName := getLoadBalancerRuleName(&service, port, nil) rules = append(rules, network.LoadBalancingRule{ Name: to.StringPtr(ruleName), LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{ @@ -381,7 +479,7 @@ func getTestSecurityGroup(services ...v1.Service) network.SecurityGroup { for _, port := range service.Spec.Ports { sources := getServiceSourceRanges(&service) for _, src := range sources { - ruleName := getSecurityRuleName(&service, port, src) + ruleName := getSecurityRuleName(&service, port, nil, src) rules = append(rules, network.SecurityRule{ Name: to.StringPtr(ruleName), SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{ @@ -412,7 +510,7 @@ func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, servi } for _, wantedRule := range svc.Spec.Ports { expectedRuleCount++ - wantedRuleName := getLoadBalancerRuleName(&svc, wantedRule) + wantedRuleName := getLoadBalancerRuleName(&svc, wantedRule, subnet(&svc)) foundRule := false for _, actualRule := range *loadBalancer.LoadBalancingRules { if strings.EqualFold(*actualRule.Name, wantedRuleName) && @@ -484,7 +582,7 @@ func validateSecurityGroup(t *testing.T, securityGroup network.SecurityGroup, se for _, wantedRule := range svc.Spec.Ports { sources := getServiceSourceRanges(&svc) for _, source := range sources { - wantedRuleName := getSecurityRuleName(&svc, wantedRule, source) + wantedRuleName := getSecurityRuleName(&svc, wantedRule, nil, source) expectedRuleCount++ foundRule := false for _, actualRule := range *securityGroup.SecurityRules { @@ -887,3 +985,8 @@ func TestMetadataParsing(t *testing.T) { t.Errorf("Unexpected inequality:\n%#v\nvs\n%#v", network, networkJSON) } } + +func addTestSubnet(svc *v1.Service) { + svc.Annotations[ServiceAnnotationLoadBalancerInternal] = "true" + svc.Annotations[ServiceAnnotationLoadBalancerInternalSubnet] = "TestSubnet" +} diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index e6b8c4922a7..7a6b48e8d9d 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -195,13 +195,19 @@ func getBackendPoolName(clusterName string) string { return clusterName } -func getLoadBalancerRuleName(service *v1.Service, port v1.ServicePort) string { - return fmt.Sprintf("%s-%s-%d", getRulePrefix(service), port.Protocol, port.Port) +func getLoadBalancerRuleName(service *v1.Service, port v1.ServicePort, subnetName *string) string { + if subnetName == nil { + return fmt.Sprintf("%s-%s-%d", getRulePrefix(service), port.Protocol, port.Port) + } + return fmt.Sprintf("%s-%s-%s-%d", getRulePrefix(service), *subnetName, port.Protocol, port.Port) } -func getSecurityRuleName(service *v1.Service, port v1.ServicePort, sourceAddrPrefix string) string { +func getSecurityRuleName(service *v1.Service, port v1.ServicePort, subnetName *string, sourceAddrPrefix string) string { safePrefix := strings.Replace(sourceAddrPrefix, "/", "_", -1) - return fmt.Sprintf("%s-%s-%d-%s", getRulePrefix(service), port.Protocol, port.Port, safePrefix) + if subnetName == nil { + return fmt.Sprintf("%s-%s-%d-%s", getRulePrefix(service), port.Protocol, port.Port, safePrefix) + } + return fmt.Sprintf("%s-%s-%s-%d-%s", getRulePrefix(service), *subnetName, port.Protocol, port.Port, safePrefix) } // This returns a human-readable version of the Service used to tag some resources. @@ -224,8 +230,17 @@ func serviceOwnsRule(service *v1.Service, rule string) bool { return strings.HasPrefix(strings.ToUpper(rule), strings.ToUpper(prefix)) } -func getFrontendIPConfigName(service *v1.Service) string { - return cloudprovider.GetLoadBalancerName(service) +func serviceOwnsFrontEndIP(fip network.FrontendIPConfiguration, service *v1.Service) bool { + baseName := cloudprovider.GetLoadBalancerName(service) + return strings.HasPrefix(*fip.Name, baseName) +} + +func getFrontendIPConfigName(service *v1.Service, subnetName *string) string { + baseName := cloudprovider.GetLoadBalancerName(service) + if subnetName != nil { + return fmt.Sprintf("%s-%s", baseName, *subnetName) + } + return baseName } // This returns the next available rule priority level for a given set of security rules.