From e0d125b0462296588c8bc84540f7dbe9d4c0ca70 Mon Sep 17 00:00:00 2001 From: knight42 Date: Sat, 30 May 2020 01:55:48 +0800 Subject: [PATCH 1/2] fix(service::repair): accept same nodePort with different protocols Signed-off-by: knight42 --- .../service/portallocator/controller/BUILD | 1 + .../portallocator/controller/repair.go | 39 ++++++++-- .../portallocator/controller/repair_test.go | 78 +++++++++++++++++++ 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index c4ea3f284d0..0b2a189494e 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -20,6 +20,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index 32ad5b802b0..0b4d001010b 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -139,7 +140,7 @@ func (c *Repair) runOnce() error { case nil: if stored.Has(port) { // remove it from the old set, so we can find leaks - stored.Release(port) + _ = stored.Release(port) } else { // doesn't seem to be allocated c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port) @@ -200,17 +201,39 @@ func (c *Repair) runOnce() error { return nil } +// collectServiceNodePorts returns nodePorts specified in the Service. +// Please note that: +// 1. same nodePort with *same* protocol will be duplicated as it is +// 2. same nodePort with *different* protocol will be deduplicated func collectServiceNodePorts(service *corev1.Service) []int { - servicePorts := []int{} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - if servicePort.NodePort != 0 { - servicePorts = append(servicePorts, int(servicePort.NodePort)) + var servicePorts []int + // map from nodePort to set of protocols + seen := make(map[int]sets.String) + for _, port := range service.Spec.Ports { + nodePort := int(port.NodePort) + if nodePort == 0 { + continue } + proto := string(port.Protocol) + s := seen[nodePort] + if s == nil { // have not seen this nodePort before + s = sets.NewString(proto) + servicePorts = append(servicePorts, nodePort) + } else if s.Has(proto) { // same nodePort with same protocol + servicePorts = append(servicePorts, nodePort) + } else { // same nodePort with different protocol + s.Insert(proto) + } + seen[nodePort] = s } - if service.Spec.HealthCheckNodePort != 0 { - servicePorts = append(servicePorts, int(service.Spec.HealthCheckNodePort)) + healthPort := int(service.Spec.HealthCheckNodePort) + if healthPort != 0 { + s := seen[healthPort] + // TODO: is it safe to assume the protocol is always TCP? + if s == nil || s.Has(string(corev1.ProtocolTCP)) { + servicePorts = append(servicePorts, healthPort) + } } return servicePorts diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index ad3b1848fd3..b936e150cfa 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -18,6 +18,8 @@ package controller import ( "fmt" + "reflect" + "sort" "strings" "testing" @@ -203,3 +205,79 @@ func TestRepairWithExisting(t *testing.T) { t.Errorf("unexpected portallocator state: %d free", free) } } + +func TestCollectServiceNodePorts(t *testing.T) { + tests := []struct { + name string + serviceSpec corev1.ServiceSpec + expected []int + }{ + { + name: "no duplicated nodePorts", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + {NodePort: 113, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 112, 113}, + }, + { + name: "duplicated nodePort with different protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 112}, + }, + { + name: "no duplicated port(with health check port)", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + }, + HealthCheckNodePort: 113, + }, + expected: []int{111, 112, 113}, + }, + { + name: "nodePort has different protocol with duplicated health check port", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + HealthCheckNodePort: 111, + }, + expected: []int{111, 112}, + }, + { + name: "nodePort has same protocol with duplicated health check port", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + HealthCheckNodePort: 112, + }, + expected: []int{111, 112, 112}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ports := collectServiceNodePorts(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"}, + Spec: tc.serviceSpec, + }) + sort.Ints(ports) + if !reflect.DeepEqual(tc.expected, ports) { + t.Fatalf("Invalid result\nexpected: %v\ngot: %v", tc.expected, ports) + } + }) + } +} From 136849728ca984452c0b73120dc0ac4704d44e6c Mon Sep 17 00:00:00 2001 From: knight42 Date: Sat, 13 Jun 2020 09:30:20 +0800 Subject: [PATCH 2/2] address comments --- .../portallocator/controller/repair.go | 2 +- .../portallocator/controller/repair_test.go | 24 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index 0b4d001010b..a7b6d28d2da 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -140,7 +140,7 @@ func (c *Repair) runOnce() error { case nil: if stored.Has(port) { // remove it from the old set, so we can find leaks - _ = stored.Release(port) + stored.Release(port) } else { // doesn't seem to be allocated c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port) diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index b936e150cfa..e9a4dc242b3 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -223,6 +223,28 @@ func TestCollectServiceNodePorts(t *testing.T) { }, expected: []int{111, 112, 113}, }, + { + name: "duplicated nodePort with TCP protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 111, Protocol: corev1.ProtocolTCP}, + {NodePort: 112, Protocol: corev1.ProtocolUDP}, + }, + }, + expected: []int{111, 111, 112}, + }, + { + name: "duplicated nodePort with UDP protocol", + serviceSpec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 111, Protocol: corev1.ProtocolUDP}, + {NodePort: 112, Protocol: corev1.ProtocolTCP}, + }, + }, + expected: []int{111, 111, 112}, + }, { name: "duplicated nodePort with different protocol", serviceSpec: corev1.ServiceSpec{ @@ -257,7 +279,7 @@ func TestCollectServiceNodePorts(t *testing.T) { expected: []int{111, 112}, }, { - name: "nodePort has same protocol with duplicated health check port", + name: "nodePort has same protocol as duplicated health check port", serviceSpec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ {NodePort: 111, Protocol: corev1.ProtocolUDP},