mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
azure: loadbalancer: support UDP svc ports+rules
This commit is contained in:
parent
8b50b83067
commit
355c2be7a0
@ -503,10 +503,11 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
} else {
|
||||
ports = []v1.ServicePort{}
|
||||
}
|
||||
expectedProbes := make([]network.Probe, len(ports))
|
||||
expectedRules := make([]network.LoadBalancingRule, len(ports))
|
||||
for i, port := range ports {
|
||||
lbRuleName := getRuleName(service, port)
|
||||
lbRuleName := getRuleName(service, port)
|
||||
|
||||
var expectedProbes []network.Probe
|
||||
var expectedRules []network.LoadBalancingRule
|
||||
for _, port := range ports {
|
||||
|
||||
transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol)
|
||||
if err != nil {
|
||||
@ -514,9 +515,16 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
}
|
||||
|
||||
if serviceapi.NeedsHealthCheck(service) {
|
||||
if port.Protocol == v1.ProtocolUDP {
|
||||
// ERROR: this isn't supported
|
||||
// health check (aka source ip preservation) is not
|
||||
// compatible with UDP (it uses an HTTP check)
|
||||
return lb, false, fmt.Errorf("services requiring health checks are incompatible with UDP ports")
|
||||
}
|
||||
|
||||
podPresencePath, podPresencePort := serviceapi.GetServiceHealthCheckPathPort(service)
|
||||
|
||||
expectedProbes[i] = network.Probe{
|
||||
expectedProbes = append(expectedProbes, network.Probe{
|
||||
Name: &lbRuleName,
|
||||
ProbePropertiesFormat: &network.ProbePropertiesFormat{
|
||||
RequestPath: to.StringPtr(podPresencePath),
|
||||
@ -525,42 +533,49 @@ func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, fipConfiguration
|
||||
IntervalInSeconds: to.Int32Ptr(5),
|
||||
NumberOfProbes: to.Int32Ptr(2),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
expectedProbes[i] = network.Probe{
|
||||
})
|
||||
} else if port.Protocol != v1.ProtocolUDP {
|
||||
// we only add the expected probe if we're doing TCP
|
||||
expectedProbes = append(expectedProbes, network.Probe{
|
||||
Name: &lbRuleName,
|
||||
ProbePropertiesFormat: &network.ProbePropertiesFormat{
|
||||
Protocol: probeProto,
|
||||
Protocol: *probeProto,
|
||||
Port: to.Int32Ptr(port.NodePort),
|
||||
IntervalInSeconds: to.Int32Ptr(5),
|
||||
NumberOfProbes: to.Int32Ptr(2),
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
loadDistribution := network.Default
|
||||
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
|
||||
loadDistribution = network.SourceIP
|
||||
}
|
||||
expectedRules[i] = network.LoadBalancingRule{
|
||||
expectedRule := network.LoadBalancingRule{
|
||||
Name: &lbRuleName,
|
||||
LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{
|
||||
Protocol: transportProto,
|
||||
Protocol: *transportProto,
|
||||
FrontendIPConfiguration: &network.SubResource{
|
||||
ID: to.StringPtr(lbFrontendIPConfigID),
|
||||
},
|
||||
BackendAddressPool: &network.SubResource{
|
||||
ID: to.StringPtr(lbBackendPoolID),
|
||||
},
|
||||
Probe: &network.SubResource{
|
||||
ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)),
|
||||
},
|
||||
LoadDistribution: loadDistribution,
|
||||
FrontendPort: to.Int32Ptr(port.Port),
|
||||
BackendPort: to.Int32Ptr(port.Port),
|
||||
EnableFloatingIP: to.BoolPtr(true),
|
||||
},
|
||||
}
|
||||
|
||||
// we didn't construct the probe objects for UDP because they're not used/needed/allowed
|
||||
if port.Protocol != v1.ProtocolUDP {
|
||||
expectedRule.Probe = &network.SubResource{
|
||||
ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)),
|
||||
}
|
||||
}
|
||||
|
||||
expectedRules = append(expectedRules, expectedRule)
|
||||
}
|
||||
|
||||
// remove unwanted probes
|
||||
@ -685,7 +700,7 @@ func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName st
|
||||
expectedSecurityRules[ix] = network.SecurityRule{
|
||||
Name: to.StringPtr(securityRuleName),
|
||||
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
|
||||
Protocol: securityProto,
|
||||
Protocol: *securityProto,
|
||||
SourcePortRange: to.StringPtr("*"),
|
||||
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
|
||||
SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]),
|
||||
|
@ -35,11 +35,18 @@ var testClusterName = "testCluster"
|
||||
// Test additional of a new service/port.
|
||||
func TestReconcileLoadBalancerAddPort(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80)
|
||||
configProperties := getTestPublicFipConfigurationProperties()
|
||||
lb := getTestLoadBalancer()
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
svc.Spec.Ports = append(svc.Spec.Ports, v1.ServicePort{
|
||||
Name: fmt.Sprintf("port-udp-%d", 1234),
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: 1234,
|
||||
NodePort: getBackendPort(1234),
|
||||
})
|
||||
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &configProperties, testClusterName, &svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
@ -59,7 +66,7 @@ func TestReconcileLoadBalancerAddPort(t *testing.T) {
|
||||
|
||||
func TestReconcileLoadBalancerNodeHealth(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80)
|
||||
svc.Annotations = map[string]string{
|
||||
serviceapi.BetaAnnotationExternalTraffic: serviceapi.AnnotationValueExternalTrafficLocal,
|
||||
serviceapi.BetaAnnotationHealthCheckNodePort: "32456",
|
||||
@ -89,7 +96,7 @@ func TestReconcileLoadBalancerNodeHealth(t *testing.T) {
|
||||
// Test removing all services results in removing the frontend ip configuration
|
||||
func TestReconcileLoadBalancerRemoveService(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80, 443)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
|
||||
lb := getTestLoadBalancer()
|
||||
configProperties := getTestPublicFipConfigurationProperties()
|
||||
nodes := []*v1.Node{}
|
||||
@ -120,7 +127,7 @@ func TestReconcileLoadBalancerRemoveService(t *testing.T) {
|
||||
// Test removing all service ports results in removing the frontend ip configuration
|
||||
func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80)
|
||||
lb := getTestLoadBalancer()
|
||||
configProperties := getTestPublicFipConfigurationProperties()
|
||||
nodes := []*v1.Node{}
|
||||
@ -131,7 +138,7 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T)
|
||||
}
|
||||
validateLoadBalancer(t, lb, svc)
|
||||
|
||||
svcUpdated := getTestService("servicea")
|
||||
svcUpdated := getTestService("servicea", v1.ProtocolTCP)
|
||||
lb, updated, err = az.reconcileLoadBalancer(lb, nil, testClusterName, &svcUpdated, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
@ -152,13 +159,13 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T)
|
||||
// Test removal of a port from an existing service.
|
||||
func TestReconcileLoadBalancerRemovesPort(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80, 443)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
|
||||
configProperties := getTestPublicFipConfigurationProperties()
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
existingLoadBalancer := getTestLoadBalancer(svc)
|
||||
|
||||
svcUpdated := getTestService("servicea", 80)
|
||||
svcUpdated := getTestService("servicea", v1.ProtocolTCP, 80)
|
||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &configProperties, testClusterName, &svcUpdated, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
@ -170,8 +177,8 @@ func TestReconcileLoadBalancerRemovesPort(t *testing.T) {
|
||||
// Test reconciliation of multiple services on same port
|
||||
func TestReconcileLoadBalancerMultipleServices(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc1 := getTestService("servicea", 80, 443)
|
||||
svc2 := getTestService("serviceb", 80)
|
||||
svc1 := getTestService("servicea", v1.ProtocolTCP, 80, 443)
|
||||
svc2 := getTestService("serviceb", v1.ProtocolTCP, 80)
|
||||
configProperties := getTestPublicFipConfigurationProperties()
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
@ -192,7 +199,7 @@ func TestReconcileLoadBalancerMultipleServices(t *testing.T) {
|
||||
|
||||
func TestReconcileSecurityGroupNewServiceAddsPort(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc1 := getTestService("serviceea", 80)
|
||||
svc1 := getTestService("serviceea", v1.ProtocolTCP, 80)
|
||||
|
||||
sg := getTestSecurityGroup()
|
||||
|
||||
@ -219,8 +226,8 @@ func TestReconcileSecurityGroupNewInternalServiceAddsPort(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReconcileSecurityGroupRemoveService(t *testing.T) {
|
||||
service1 := getTestService("servicea", 81)
|
||||
service2 := getTestService("serviceb", 82)
|
||||
service1 := getTestService("servicea", v1.ProtocolTCP, 81)
|
||||
service2 := getTestService("serviceb", v1.ProtocolTCP, 82)
|
||||
|
||||
sg := getTestSecurityGroup(service1, service2)
|
||||
|
||||
@ -236,11 +243,11 @@ func TestReconcileSecurityGroupRemoveService(t *testing.T) {
|
||||
|
||||
func TestReconcileSecurityGroupRemoveServiceRemovesPort(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80, 443)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
|
||||
|
||||
sg := getTestSecurityGroup(svc)
|
||||
|
||||
svcUpdated := getTestService("servicea", 80)
|
||||
svcUpdated := getTestService("servicea", v1.ProtocolTCP, 80)
|
||||
sg, _, err := az.reconcileSecurityGroup(sg, testClusterName, &svcUpdated, true)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
@ -251,7 +258,7 @@ func TestReconcileSecurityGroupRemoveServiceRemovesPort(t *testing.T) {
|
||||
|
||||
func TestReconcileSecurityWithSourceRanges(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80, 443)
|
||||
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
|
||||
svc.Spec.LoadBalancerSourceRanges = []string{
|
||||
"192.168.0.1/24",
|
||||
"10.0.0.1/32",
|
||||
@ -291,12 +298,12 @@ func getTestPublicFipConfigurationProperties() network.FrontendIPConfigurationPr
|
||||
}
|
||||
}
|
||||
|
||||
func getTestService(identifier string, requestedPorts ...int32) v1.Service {
|
||||
func getTestService(identifier string, proto v1.Protocol, requestedPorts ...int32) v1.Service {
|
||||
ports := []v1.ServicePort{}
|
||||
for _, port := range requestedPorts {
|
||||
ports = append(ports, v1.ServicePort{
|
||||
Name: fmt.Sprintf("port-%d", port),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Name: fmt.Sprintf("port-tcp-%d", port),
|
||||
Protocol: proto,
|
||||
Port: port,
|
||||
NodePort: getBackendPort(port),
|
||||
})
|
||||
@ -317,7 +324,7 @@ func getTestService(identifier string, requestedPorts ...int32) v1.Service {
|
||||
}
|
||||
|
||||
func getInternalTestService(identifier string, requestedPorts ...int32) v1.Service {
|
||||
svc := getTestService(identifier, requestedPorts...)
|
||||
svc := getTestService(identifier, v1.ProtocolTCP, requestedPorts...)
|
||||
svc.Annotations[ServiceAnnotationLoadBalancerInternal] = "true"
|
||||
|
||||
return svc
|
||||
@ -398,6 +405,7 @@ func getTestSecurityGroup(services ...v1.Service) network.SecurityGroup {
|
||||
func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, services ...v1.Service) {
|
||||
expectedRuleCount := 0
|
||||
expectedFrontendIPCount := 0
|
||||
expectedProbeCount := 0
|
||||
for _, svc := range services {
|
||||
if len(svc.Spec.Ports) > 0 {
|
||||
expectedFrontendIPCount++
|
||||
@ -418,6 +426,12 @@ func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, servi
|
||||
t.Errorf("Expected load balancer rule but didn't find it: %q", wantedRuleName)
|
||||
}
|
||||
|
||||
// if UDP rule, there is no probe
|
||||
if wantedRule.Protocol == v1.ProtocolUDP {
|
||||
continue
|
||||
}
|
||||
|
||||
expectedProbeCount++
|
||||
foundProbe := false
|
||||
if serviceapi.NeedsHealthCheck(&svc) {
|
||||
path, port := serviceapi.GetServiceHealthCheckPathPort(&svc)
|
||||
@ -457,8 +471,9 @@ func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, servi
|
||||
if lenRules != expectedRuleCount {
|
||||
t.Errorf("Expected the loadbalancer to have %d rules. Found %d.\n%v", expectedRuleCount, lenRules, loadBalancer.LoadBalancingRules)
|
||||
}
|
||||
|
||||
lenProbes := len(*loadBalancer.Probes)
|
||||
if lenProbes != expectedRuleCount {
|
||||
if lenProbes != expectedProbeCount {
|
||||
t.Errorf("Expected the loadbalancer to have %d probes. Found %d.", expectedRuleCount, lenProbes)
|
||||
}
|
||||
}
|
||||
|
@ -122,13 +122,24 @@ func getLastSegment(ID string) (string, error) {
|
||||
|
||||
// returns the equivalent LoadBalancerRule, SecurityRule and LoadBalancerProbe
|
||||
// protocol types for the given Kubernetes protocol type.
|
||||
func getProtocolsFromKubernetesProtocol(protocol v1.Protocol) (network.TransportProtocol, network.SecurityRuleProtocol, network.ProbeProtocol, error) {
|
||||
func getProtocolsFromKubernetesProtocol(protocol v1.Protocol) (*network.TransportProtocol, *network.SecurityRuleProtocol, *network.ProbeProtocol, error) {
|
||||
var transportProto network.TransportProtocol
|
||||
var securityProto network.SecurityRuleProtocol
|
||||
var probeProto network.ProbeProtocol
|
||||
|
||||
switch protocol {
|
||||
case v1.ProtocolTCP:
|
||||
return network.TransportProtocolTCP, network.TCP, network.ProbeProtocolTCP, nil
|
||||
transportProto = network.TransportProtocolTCP
|
||||
securityProto = network.TCP
|
||||
probeProto = network.ProbeProtocolTCP
|
||||
case v1.ProtocolUDP:
|
||||
transportProto = network.TransportProtocolUDP
|
||||
securityProto = network.UDP
|
||||
default:
|
||||
return "", "", "", fmt.Errorf("Only TCP is supported for Azure LoadBalancers")
|
||||
return &transportProto, &securityProto, &probeProto, fmt.Errorf("Only TCP and UDP are supported for Azure LoadBalancers")
|
||||
}
|
||||
|
||||
return &transportProto, &securityProto, &probeProto, nil
|
||||
}
|
||||
|
||||
// This returns the full identifier of the primary NIC for the given VM.
|
||||
|
Loading…
Reference in New Issue
Block a user