diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index c4ea3f284d0..0b2a189494e 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -20,6 +20,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index 32ad5b802b0..a7b6d28d2da 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -200,17 +201,39 @@ func (c *Repair) runOnce() error { return nil } +// collectServiceNodePorts returns nodePorts specified in the Service. +// Please note that: +// 1. same nodePort with *same* protocol will be duplicated as it is +// 2. same nodePort with *different* protocol will be deduplicated func collectServiceNodePorts(service *corev1.Service) []int { - servicePorts := []int{} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if servicePort.NodePort != 0 { - servicePorts = append(servicePorts, int(servicePort.NodePort)) + var servicePorts []int + // map from nodePort to set of protocols + seen := make(map[int]sets.String) + for _, port := range service.Spec.Ports { + nodePort := int(port.NodePort) + if nodePort == 0 { + continue } + proto := string(port.Protocol) + s := seen[nodePort] + if s == nil { // have not seen this nodePort before + s = sets.NewString(proto) + servicePorts = append(servicePorts, nodePort) + } else if s.Has(proto) { // same nodePort with same protocol + servicePorts = append(servicePorts, nodePort) + } else { // same nodePort with different protocol + s.Insert(proto) + } + seen[nodePort] = s } - if service.Spec.HealthCheckNodePort != 0 { - servicePorts = append(servicePorts, int(service.Spec.HealthCheckNodePort)) + healthPort := int(service.Spec.HealthCheckNodePort) + if healthPort != 0 { + s := seen[healthPort] + // TODO: is it safe to assume the protocol is always TCP? + if s == nil || s.Has(string(corev1.ProtocolTCP)) { + servicePorts = append(servicePorts, healthPort) + } } return servicePorts diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index ad3b1848fd3..e9a4dc242b3 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -18,6 +18,8 @@ package controller import ( "fmt" + "reflect" + "sort" "strings" "testing" @@ -203,3 +205,101 @@ func TestRepairWithExisting(t *testing.T) { t.Errorf("unexpected portallocator state: %d free", free) } } + +func TestCollectServiceNodePorts(t *testing.T) { + tests := []struct { + name string + serviceSpec corev1.ServiceSpec + expected []int + }{ + { + name: "no duplicated nodePorts", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + {NodePort: 113, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 112, 113}, + }, + { + name: "duplicated nodePort with TCP protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 111, 112}, + }, + { + name: "duplicated nodePort with UDP protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + }, + expected: []int{111, 111, 112}, + }, + { + name: "duplicated nodePort with different protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 112}, + }, + { + name: "no duplicated port(with health check port)", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + }, + HealthCheckNodePort: 113, + }, + expected: []int{111, 112, 113}, + }, + { + name: "nodePort has different protocol with duplicated health check port", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + HealthCheckNodePort: 111, + }, + expected: []int{111, 112}, + }, + { + name: "nodePort has same protocol as duplicated health check port", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + HealthCheckNodePort: 112, + }, + expected: []int{111, 112, 112}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ports := collectServiceNodePorts(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"}, + Spec: tc.serviceSpec, + }) + sort.Ints(ports) + if !reflect.DeepEqual(tc.expected, ports) { + t.Fatalf("Invalid result\nexpected: %v\ngot: %v", tc.expected, ports) + } + }) + } +}