mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #115966 from aojea/lb_mixed
don't process unsupported loadbalancers with mixed protocols
This commit is contained in:
commit
bfc23bbf19
@ -4515,6 +4515,9 @@ const (
|
||||
// LoadBalancerPortsError represents the condition of the requested ports
|
||||
// on the cloud load balancer instance.
|
||||
LoadBalancerPortsError = "LoadBalancerPortsError"
|
||||
// LoadBalancerPortsErrorReason reason in ServiceStatus condition LoadBalancerPortsError
|
||||
// means the LoadBalancer was not able to be configured correctly.
|
||||
LoadBalancerPortsErrorReason = "LoadBalancerMixedProtocolNotSupported"
|
||||
)
|
||||
|
||||
// ServiceStatus represents the current status of a service.
|
||||
@ -6872,6 +6875,13 @@ const (
|
||||
PortForwardRequestIDHeader = "requestID"
|
||||
)
|
||||
|
||||
const (
|
||||
// MixedProtocolNotSupported error in PortStatus means that the cloud provider
|
||||
// can't publish the port on the load balancer because mixed values of protocols
|
||||
// on the same LoadBalancer type of Service are not supported by the cloud provider.
|
||||
MixedProtocolNotSupported = "MixedProtocolNotSupported"
|
||||
)
|
||||
|
||||
// PortStatus represents the error condition of a service port
|
||||
|
||||
type PortStatus struct {
|
||||
|
@ -27,6 +27,9 @@ import (
|
||||
"strings"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
|
||||
metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
|
||||
@ -134,6 +137,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Services with multiples protocols are not supported by this controller, warn the users and sets
|
||||
// the corresponding Service Status Condition.
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
|
||||
if err := checkMixedProtocol(svc.Spec.Ports); err != nil {
|
||||
if hasLoadBalancerPortsError(svc) {
|
||||
return nil, err
|
||||
}
|
||||
klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name)
|
||||
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.")
|
||||
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
|
||||
metav1apply.Condition().
|
||||
WithType(v1.LoadBalancerPortsError).
|
||||
WithStatus(metav1.ConditionTrue).
|
||||
WithReason(v1.LoadBalancerPortsErrorReason).
|
||||
WithMessage("LoadBalancer with multiple protocols are not supported"))
|
||||
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
|
||||
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil {
|
||||
return nil, errApply
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme)
|
||||
|
||||
existingFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region)
|
||||
@ -187,6 +212,25 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc
|
||||
return err
|
||||
}
|
||||
|
||||
// Services with multiples protocols are not supported by this controller, warn the users and sets
|
||||
// the corresponding Service Status Condition, but keep processing the Update to not break upgrades.
|
||||
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
|
||||
if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) {
|
||||
klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name)
|
||||
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.")
|
||||
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
|
||||
metav1apply.Condition().
|
||||
WithType(v1.LoadBalancerPortsError).
|
||||
WithStatus(metav1.ConditionTrue).
|
||||
WithReason(v1.LoadBalancerPortsErrorReason).
|
||||
WithMessage("LoadBalancer with multiple protocols are not supported"))
|
||||
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
|
||||
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-legacy-cloud-controller", Force: true}); errApply != nil {
|
||||
// the error is retried by the controller loop
|
||||
return errApply
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v, %v): updating with %d nodes", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, len(nodes))
|
||||
|
||||
switch scheme {
|
||||
@ -226,3 +270,33 @@ func getSvcScheme(svc *v1.Service) cloud.LbScheme {
|
||||
}
|
||||
return cloud.SchemeExternal
|
||||
}
|
||||
|
||||
// checkMixedProtocol checks if the Service Ports uses different protocols,
|
||||
// per examples, TCP and UDP.
|
||||
func checkMixedProtocol(ports []v1.ServicePort) error {
|
||||
if len(ports) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
firstProtocol := ports[0].Protocol
|
||||
for _, port := range ports[1:] {
|
||||
if port.Protocol != firstProtocol {
|
||||
return fmt.Errorf("mixed protocol is not supported for LoadBalancer")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasLoadBalancerPortsError checks if the Service has the LoadBalancerPortsError set to True
|
||||
func hasLoadBalancerPortsError(service *v1.Service) bool {
|
||||
if service == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, cond := range service.Status.Conditions {
|
||||
if cond.Type == v1.LoadBalancerPortsError {
|
||||
return cond.Status == metav1.ConditionTrue
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -21,10 +21,12 @@ package gce
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
@ -193,3 +195,256 @@ func TestProjectsBasePath(t *testing.T) {
|
||||
t.Errorf("Compute projectsBasePath has changed. Got %q, want %q or %q", gce.projectsBasePath, expectProjectsBasePath, expectMtlsProjectsBasePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureLoadBalancerMixedProtocols(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
|
||||
apiService := fakeLoadbalancerService("")
|
||||
apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(8080),
|
||||
})
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
_, err = gce.EnsureLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
|
||||
if err == nil {
|
||||
t.Errorf("Expected error ensuring loadbalancer for Service with multiple ports")
|
||||
}
|
||||
if err.Error() != "mixed protocol is not supported for LoadBalancer" {
|
||||
t.Fatalf("unexpected error, got: %s wanted \"mixed protocol is not supported for LoadBalancer\"", err.Error())
|
||||
}
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if !hasLoadBalancerPortsError(apiService) {
|
||||
t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateLoadBalancerMixedProtocols(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
vals := DefaultTestClusterValues()
|
||||
gce, err := fakeGCECloud(vals)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeNames := []string{"test-node-1"}
|
||||
nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName)
|
||||
require.NoError(t, err)
|
||||
|
||||
apiService := fakeLoadbalancerService("")
|
||||
apiService.Spec.Ports = append(apiService.Spec.Ports, v1.ServicePort{
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(8080),
|
||||
})
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Create(context.TODO(), apiService, metav1.CreateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// create an external loadbalancer to simulate an upgrade scenario where the loadbalancer exists
|
||||
// before the new controller is running and later the Service is updated
|
||||
_, err = createExternalLoadBalancer(gce, apiService, nodeNames, vals.ClusterName, vals.ClusterID, vals.ZoneName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = gce.UpdateLoadBalancer(context.Background(), vals.ClusterName, apiService, nodes)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
apiService, err = gce.client.CoreV1().Services(apiService.Namespace).Get(context.TODO(), apiService.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if !hasLoadBalancerPortsError(apiService) {
|
||||
t.Fatalf("Expected condition %v to be True, got %v", v1.LoadBalancerPortsError, apiService.Status.Conditions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckMixedProtocol(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
annotations map[string]string
|
||||
ports []v1.ServicePort
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "TCP",
|
||||
annotations: make(map[string]string),
|
||||
ports: []v1.ServicePort{
|
||||
{
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: int32(8080),
|
||||
},
|
||||
},
|
||||
wantErr: nil,
|
||||
},
|
||||
{
|
||||
name: "UDP",
|
||||
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
|
||||
ports: []v1.ServicePort{
|
||||
{
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(8080),
|
||||
},
|
||||
},
|
||||
wantErr: nil,
|
||||
},
|
||||
{
|
||||
name: "TCP",
|
||||
annotations: make(map[string]string),
|
||||
ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "port80",
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: int32(80),
|
||||
},
|
||||
{
|
||||
Name: "port8080",
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: int32(8080),
|
||||
},
|
||||
},
|
||||
wantErr: nil,
|
||||
},
|
||||
{
|
||||
name: "UDP",
|
||||
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
|
||||
ports: []v1.ServicePort{
|
||||
{
|
||||
Name: "port80",
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(80),
|
||||
},
|
||||
{
|
||||
Name: "port8080",
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(8080),
|
||||
},
|
||||
},
|
||||
wantErr: nil,
|
||||
},
|
||||
{
|
||||
name: "TCP and UDP",
|
||||
annotations: map[string]string{ServiceAnnotationLoadBalancerType: "nlb"},
|
||||
ports: []v1.ServicePort{
|
||||
{
|
||||
Protocol: v1.ProtocolUDP,
|
||||
Port: int32(53),
|
||||
},
|
||||
{
|
||||
Protocol: v1.ProtocolTCP,
|
||||
Port: int32(53),
|
||||
},
|
||||
},
|
||||
wantErr: fmt.Errorf("mixed protocol is not supported for LoadBalancer"),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
tt := test
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
err := checkMixedProtocol(tt.ports)
|
||||
if tt.wantErr != nil {
|
||||
assert.EqualError(t, err, tt.wantErr.Error())
|
||||
} else {
|
||||
assert.Equal(t, err, nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_hasLoadBalancerPortsError(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
service *v1.Service
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "no status",
|
||||
service: &v1.Service{},
|
||||
},
|
||||
{
|
||||
name: "condition set to true",
|
||||
service: &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIPs: []string{"1.2.3.4"},
|
||||
Type: "LoadBalancer",
|
||||
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: v1.LoadBalancerPortsError,
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "condition set false",
|
||||
service: &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIPs: []string{"1.2.3.4"},
|
||||
Type: "LoadBalancer",
|
||||
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: v1.LoadBalancerPortsError,
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple conditions unrelated",
|
||||
service: &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "service1"},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIPs: []string{"1.2.3.4"},
|
||||
Type: "LoadBalancer",
|
||||
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP"}},
|
||||
},
|
||||
Status: v1.ServiceStatus{
|
||||
LoadBalancer: v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{{IP: "2.3.4.5"}, {IP: "3.4.5.6"}}},
|
||||
Conditions: []metav1.Condition{
|
||||
{
|
||||
Type: "condition1",
|
||||
Status: metav1.ConditionFalse,
|
||||
},
|
||||
{
|
||||
Type: "condition2",
|
||||
Status: metav1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := hasLoadBalancerPortsError(tt.service); got != tt.want {
|
||||
t.Errorf("hasLoadBalancerPortsError() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
@ -57,6 +58,7 @@ func fakeGCECloud(vals TestClusterValues) (*Cloud, error) {
|
||||
gce.AlphaFeatureGate = NewAlphaFeatureGate([]string{})
|
||||
gce.nodeInformerSynced = func() bool { return true }
|
||||
gce.client = fake.NewSimpleClientset()
|
||||
gce.eventRecorder = &record.FakeRecorder{}
|
||||
|
||||
mockGCE := gce.c.(*cloud.MockGCE)
|
||||
mockGCE.MockTargetPools.AddInstanceHook = mock.AddInstanceHook
|
||||
|
Loading…
Reference in New Issue
Block a user