diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 271dfa1d495..abca3a7f423 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -503,10 +503,11 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration } else { ports = []v1.ServicePort{} } - expectedProbes := make([]network.Probe, len(ports)) - expectedRules := make([]network.LoadBalancingRule, len(ports)) - for i, port := range ports { - lbRuleName := getRuleName(service, port) + lbRuleName := getRuleName(service, port) + + var expectedProbes []network.Probe + var expectedRules []network.LoadBalancingRule + for _, port := range ports { transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol) if err != nil { @@ -514,9 +515,16 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration } if serviceapi.NeedsHealthCheck(service) { + if port.Protocol == v1.ProtocolUDP { + // ERROR: this isn't supported + // health check (aka source ip preservation) is not + // compatible with UDP (it uses an HTTP check) + return lb, false, fmt.Errorf("services requiring health checks are incompatible with UDP ports") + } + podPresencePath, podPresencePort := serviceapi.GetServiceHealthCheckPathPort(service) - expectedProbes[i] = network.Probe{ + expectedProbes = append(expectedProbes, network.Probe{ Name: &lbRuleName, ProbePropertiesFormat: &network.ProbePropertiesFormat{ RequestPath: to.StringPtr(podPresencePath), @@ -525,42 +533,49 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration IntervalInSeconds: to.Int32Ptr(5), NumberOfProbes: to.Int32Ptr(2), }, - } - } else { - expectedProbes[i] = network.Probe{ + }) + } else if port.Protocol != v1.ProtocolUDP { + // we only add the expected probe if we're doing TCP + expectedProbes = append(expectedProbes, network.Probe{ Name: &lbRuleName, ProbePropertiesFormat: &network.ProbePropertiesFormat{ - Protocol: probeProto, + Protocol: *probeProto, Port: to.Int32Ptr(port.NodePort), IntervalInSeconds: to.Int32Ptr(5), NumberOfProbes: to.Int32Ptr(2), }, - } + }) } loadDistribution := network.Default if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP { loadDistribution = network.SourceIP } - expectedRules[i] = network.LoadBalancingRule{ + expectedRule := network.LoadBalancingRule{ Name: &lbRuleName, LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{ - Protocol: transportProto, + Protocol: *transportProto, FrontendIPConfiguration: &network.SubResource{ ID: to.StringPtr(lbFrontendIPConfigID), }, BackendAddressPool: &network.SubResource{ ID: to.StringPtr(lbBackendPoolID), }, - Probe: &network.SubResource{ - ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)), - }, LoadDistribution: loadDistribution, FrontendPort: to.Int32Ptr(port.Port), BackendPort: to.Int32Ptr(port.Port), EnableFloatingIP: to.BoolPtr(true), }, } + + // we didn't construct the probe objects for UDP because they're not used/needed/allowed + if port.Protocol != v1.ProtocolUDP { + expectedRule.Probe = &network.SubResource{ + ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)), + } + } + + expectedRules = append(expectedRules, expectedRule) } // remove unwanted probes @@ -685,7 +700,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st expectedSecurityRules[ix] = network.SecurityRule{ Name: to.StringPtr(securityRuleName), SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{ - Protocol: securityProto, + Protocol: *securityProto, SourcePortRange: to.StringPtr("*"), DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))), SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]), diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index af9bd96e20c..e38fe8979ac 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -35,11 +35,18 @@ var testClusterName = "testCluster" // Test additional of a new service/port. func TestReconcileLoadBalancerAddPort(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80) + svc := getTestService("servicea", v1.ProtocolTCP, 80) configProperties := getTestPublicFipConfigurationProperties() lb := getTestLoadBalancer() nodes := []*v1.Node{} + svc.Spec.Ports = append(svc.Spec.Ports, v1.ServicePort{ + Name: fmt.Sprintf("port-udp-%d", 1234), + Protocol: v1.ProtocolUDP, + Port: 1234, + NodePort: getBackendPort(1234), + }) + lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes) if err != nil { t.Errorf("Unexpected error: %q", err) @@ -59,7 +66,7 @@ func TestReconcileLoadBalancerAddPort(t *testing.T) { func TestReconcileLoadBalancerNodeHealth(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80) + svc := getTestService("servicea", v1.ProtocolTCP, 80) svc.Annotations = map[string]string{ serviceapi.BetaAnnotationExternalTraffic: serviceapi.AnnotationValueExternalTrafficLocal, serviceapi.BetaAnnotationHealthCheckNodePort: "32456", @@ -89,7 +96,7 @@ func TestReconcileLoadBalancerNodeHealth(t *testing.T) { // Test removing all services results in removing the frontend ip configuration func TestReconcileLoadBalancerRemoveService(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80, 443) + svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) lb := getTestLoadBalancer() configProperties := getTestPublicFipConfigurationProperties() nodes := []*v1.Node{} @@ -120,7 +127,7 @@ func TestReconcileLoadBalancerRemoveService(t *testing.T) { // Test removing all service ports results in removing the frontend ip configuration func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80) + svc := getTestService("servicea", v1.ProtocolTCP, 80) lb := getTestLoadBalancer() configProperties := getTestPublicFipConfigurationProperties() nodes := []*v1.Node{} @@ -131,7 +138,7 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T) } validateLoadBalancer(t, lb, svc) - svcUpdated := getTestService("servicea") + svcUpdated := getTestService("servicea", v1.ProtocolTCP) lb, updated, err = az.reconcileLoadBalancer(lb, nil, testClusterName, &svcUpdated, nodes) if err != nil { t.Errorf("Unexpected error: %q", err) @@ -152,13 +159,13 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T) // Test removal of a port from an existing service. func TestReconcileLoadBalancerRemovesPort(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80, 443) + svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) configProperties := getTestPublicFipConfigurationProperties() nodes := []*v1.Node{} existingLoadBalancer := getTestLoadBalancer(svc) - svcUpdated := getTestService("servicea", 80) + svcUpdated := getTestService("servicea", v1.ProtocolTCP, 80) updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &configProperties, testClusterName, &svcUpdated, nodes) if err != nil { t.Errorf("Unexpected error: %q", err) @@ -170,8 +177,8 @@ func TestReconcileLoadBalancerRemovesPort(t *testing.T) { // Test reconciliation of multiple services on same port func TestReconcileLoadBalancerMultipleServices(t *testing.T) { az := getTestCloud() - svc1 := getTestService("servicea", 80, 443) - svc2 := getTestService("serviceb", 80) + svc1 := getTestService("servicea", v1.ProtocolTCP, 80, 443) + svc2 := getTestService("serviceb", v1.ProtocolTCP, 80) configProperties := getTestPublicFipConfigurationProperties() nodes := []*v1.Node{} @@ -192,7 +199,7 @@ func TestReconcileLoadBalancerMultipleServices(t *testing.T) { func TestReconcileSecurityGroupNewServiceAddsPort(t *testing.T) { az := getTestCloud() - svc1 := getTestService("serviceea", 80) + svc1 := getTestService("serviceea", v1.ProtocolTCP, 80) sg := getTestSecurityGroup() @@ -219,8 +226,8 @@ func TestReconcileSecurityGroupNewInternalServiceAddsPort(t *testing.T) { } func TestReconcileSecurityGroupRemoveService(t *testing.T) { - service1 := getTestService("servicea", 81) - service2 := getTestService("serviceb", 82) + service1 := getTestService("servicea", v1.ProtocolTCP, 81) + service2 := getTestService("serviceb", v1.ProtocolTCP, 82) sg := getTestSecurityGroup(service1, service2) @@ -236,11 +243,11 @@ func TestReconcileSecurityGroupRemoveService(t *testing.T) { func TestReconcileSecurityGroupRemoveServiceRemovesPort(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80, 443) + svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) sg := getTestSecurityGroup(svc) - svcUpdated := getTestService("servicea", 80) + svcUpdated := getTestService("servicea", v1.ProtocolTCP, 80) sg, _, err := az.reconcileSecurityGroup(sg, testClusterName, &svcUpdated, true) if err != nil { t.Errorf("Unexpected error: %q", err) @@ -251,7 +258,7 @@ func TestReconcileSecurityGroupRemoveServiceRemovesPort(t *testing.T) { func TestReconcileSecurityWithSourceRanges(t *testing.T) { az := getTestCloud() - svc := getTestService("servicea", 80, 443) + svc := getTestService("servicea", v1.ProtocolTCP, 80, 443) svc.Spec.LoadBalancerSourceRanges = []string{ "192.168.0.1/24", "10.0.0.1/32", @@ -291,12 +298,12 @@ func getTestPublicFipConfigurationProperties() network.FrontendIPConfigurationPr } } -func getTestService(identifier string, requestedPorts ...int32) v1.Service { +func getTestService(identifier string, proto v1.Protocol, requestedPorts ...int32) v1.Service { ports := []v1.ServicePort{} for _, port := range requestedPorts { ports = append(ports, v1.ServicePort{ - Name: fmt.Sprintf("port-%d", port), - Protocol: v1.ProtocolTCP, + Name: fmt.Sprintf("port-tcp-%d", port), + Protocol: proto, Port: port, NodePort: getBackendPort(port), }) @@ -317,7 +324,7 @@ func getTestService(identifier string, requestedPorts ...int32) v1.Service { } func getInternalTestService(identifier string, requestedPorts ...int32) v1.Service { - svc := getTestService(identifier, requestedPorts...) + svc := getTestService(identifier, v1.ProtocolTCP, requestedPorts...) svc.Annotations[ServiceAnnotationLoadBalancerInternal] = "true" return svc @@ -398,6 +405,7 @@ func getTestSecurityGroup(services ...v1.Service) network.SecurityGroup { func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, services ...v1.Service) { expectedRuleCount := 0 expectedFrontendIPCount := 0 + expectedProbeCount := 0 for _, svc := range services { if len(svc.Spec.Ports) > 0 { expectedFrontendIPCount++ @@ -418,6 +426,12 @@ func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, servi t.Errorf("Expected load balancer rule but didn't find it: %q", wantedRuleName) } + // if UDP rule, there is no probe + if wantedRule.Protocol == v1.ProtocolUDP { + continue + } + + expectedProbeCount++ foundProbe := false if serviceapi.NeedsHealthCheck(&svc) { path, port := serviceapi.GetServiceHealthCheckPathPort(&svc) @@ -457,8 +471,9 @@ func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, servi if lenRules != expectedRuleCount { t.Errorf("Expected the loadbalancer to have %d rules. Found %d.\n%v", expectedRuleCount, lenRules, loadBalancer.LoadBalancingRules) } + lenProbes := len(*loadBalancer.Probes) - if lenProbes != expectedRuleCount { + if lenProbes != expectedProbeCount { t.Errorf("Expected the loadbalancer to have %d probes. Found %d.", expectedRuleCount, lenProbes) } } diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go index 0054e74d23f..660a7db8b2e 100644 --- a/pkg/cloudprovider/providers/azure/azure_util.go +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -122,13 +122,24 @@ func getLastSegment(ID string) (string, error) { // returns the equivalent LoadBalancerRule, SecurityRule and LoadBalancerProbe // protocol types for the given Kubernetes protocol type. -func getProtocolsFromKubernetesProtocol(protocol v1.Protocol) (network.TransportProtocol, network.SecurityRuleProtocol, network.ProbeProtocol, error) { +func getProtocolsFromKubernetesProtocol(protocol v1.Protocol) (*network.TransportProtocol, *network.SecurityRuleProtocol, *network.ProbeProtocol, error) { + var transportProto network.TransportProtocol + var securityProto network.SecurityRuleProtocol + var probeProto network.ProbeProtocol + switch protocol { case v1.ProtocolTCP: - return network.TransportProtocolTCP, network.TCP, network.ProbeProtocolTCP, nil + transportProto = network.TransportProtocolTCP + securityProto = network.TCP + probeProto = network.ProbeProtocolTCP + case v1.ProtocolUDP: + transportProto = network.TransportProtocolUDP + securityProto = network.UDP default: - return "", "", "", fmt.Errorf("Only TCP is supported for Azure LoadBalancers") + return &transportProto, &securityProto, &probeProto, fmt.Errorf("Only TCP and UDP are supported for Azure LoadBalancers") } + + return &transportProto, &securityProto, &probeProto, nil } // This returns the full identifier of the primary NIC for the given VM.