diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 938c3aa6595..32e7ebad73f 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -712,6 +712,13 @@ const ( // Subdivide the ClusterIP range for dynamic and static IP allocation. ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange" + // owner: @xuzhenglun + // kep: http://kep.k8s.io/3682 + // alpha: v1.27 + // + // Subdivide the NodePort range for dynamic and static port allocation. + ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange" + // owner: @derekwaynecarr // alpha: v1.20 // beta: v1.22 @@ -1024,6 +1031,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ServiceInternalTrafficPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 + ServiceNodePortStaticSubrange: {Default: false, PreRelease: featuregate.Alpha}, + SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta}, StatefulSetAutoDeletePVC: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 206e87e8909..36c73114a1e 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -237,8 +237,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource } var serviceNodePortRegistry rangeallocation.RangeRegistry - serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { - mem := allocator.NewAllocationMap(max, rangeSpec) + serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) // TODO etcdallocator package to return a storage interface via the storageFactory etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations"))) if err != nil { @@ -250,6 +250,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err) } + serviceNodePortAllocator.EnableMetrics() restStorage.ServiceNodePortAllocator = serviceNodePortRegistry controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) diff --git a/pkg/registry/core/service/portallocator/allocator.go b/pkg/registry/core/service/portallocator/allocator.go index 4f6cb68caf4..54837af026d 100644 --- a/pkg/registry/core/service/portallocator/allocator.go +++ b/pkg/registry/core/service/portallocator/allocator.go @@ -21,10 +21,11 @@ import ( "fmt" "k8s.io/apimachinery/pkg/util/net" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/registry/core/service/allocator" - + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/registry/core/service/allocator" ) // Interface manages the allocation of ports out of a range. Interface @@ -36,6 +37,7 @@ type Interface interface { ForEach(func(int)) Has(int) bool Destroy() + EnableMetrics() } var ( @@ -56,28 +58,42 @@ type PortAllocator struct { portRange net.PortRange alloc allocator.Interface + + // metrics is a metrics recorder that can be disabled + metrics metricsRecorderInterface } // PortAllocator implements Interface and Snapshottable var _ Interface = &PortAllocator{} // New creates a PortAllocator over a net.PortRange, calling allocatorFactory to construct the backing store. -func New(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) (*PortAllocator, error) { +func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory) (*PortAllocator, error) { max := pr.Size rangeSpec := pr.String() a := &PortAllocator{ portRange: pr, + metrics: &emptyMetricsRecorder{}, } + + var offset = 0 + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceNodePortStaticSubrange) { + offset = calculateRangeOffset(pr) + } + var err error - a.alloc, err = allocatorFactory(max, rangeSpec) + a.alloc, err = allocatorFactory(max, rangeSpec, offset) + if err != nil { + return nil, err + } + return a, err } // NewInMemory creates an in-memory allocator. func NewInMemory(pr net.PortRange) (*PortAllocator, error) { - return New(pr, func(max int, rangeSpec string) (allocator.Interface, error) { - return allocator.NewAllocationMap(max, rangeSpec), nil + return New(pr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + return allocator.NewAllocationMapWithOffset(max, rangeSpec, offset), nil }) } @@ -114,6 +130,9 @@ func (r *PortAllocator) Used() int { func (r *PortAllocator) Allocate(port int) error { ok, offset := r.contains(port) if !ok { + // update metrics + r.metrics.incrementAllocationErrors("static") + // include valid port range in error validPorts := r.portRange.String() return &ErrNotInRange{validPorts} @@ -121,11 +140,21 @@ func (r *PortAllocator) Allocate(port int) error { allocated, err := r.alloc.Allocate(offset) if err != nil { + // update metrics + r.metrics.incrementAllocationErrors("static") return err } if !allocated { + // update metrics + r.metrics.incrementAllocationErrors("static") return ErrAllocated } + + // update metrics + r.metrics.incrementAllocations("static") + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + return nil } @@ -134,11 +163,19 @@ func (r *PortAllocator) Allocate(port int) error { func (r *PortAllocator) AllocateNext() (int, error) { offset, ok, err := r.alloc.AllocateNext() if err != nil { + r.metrics.incrementAllocationErrors("dynamic") return 0, err } if !ok { + r.metrics.incrementAllocationErrors("dynamic") return 0, ErrFull } + + // update metrics + r.metrics.incrementAllocations("dynamic") + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + return r.portRange.Base + offset, nil } @@ -159,7 +196,13 @@ func (r *PortAllocator) Release(port int) error { return nil } - return r.alloc.Release(offset) + err := r.alloc.Release(offset) + if err == nil { + // update metrics + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + } + return err } // Has returns true if the provided port is already allocated and a call @@ -213,3 +256,37 @@ func (r *PortAllocator) contains(port int) (bool, int) { func (r *PortAllocator) Destroy() { r.alloc.Destroy() } + +// EnableMetrics enables metrics recording. +func (r *PortAllocator) EnableMetrics() { + registerMetrics() + r.metrics = &metricsRecorder{} +} + +// calculateRangeOffset estimates the offset used on the range for statically allocation based on +// the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than +// $min or more than $max, with a graduated step function between them~. The function returns 0 +// if any of the parameters is invalid. +func calculateRangeOffset(pr net.PortRange) int { + // default values for min(max($min, rangeSize/$step), $max) + const ( + min = 16 + max = 128 + step = 32 + ) + + rangeSize := pr.Size + // offset should always be smaller than the range size + if rangeSize <= min { + return 0 + } + + offset := rangeSize / step + if offset < min { + return min + } + if offset > max { + return max + } + return int(offset) +} diff --git a/pkg/registry/core/service/portallocator/allocator_test.go b/pkg/registry/core/service/portallocator/allocator_test.go index 44b76dfbd66..37663f0cdc1 100644 --- a/pkg/registry/core/service/portallocator/allocator_test.go +++ b/pkg/registry/core/service/portallocator/allocator_test.go @@ -17,13 +17,16 @@ limitations under the License. package portallocator import ( - "testing" - "strconv" + "testing" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/testutil" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" ) func TestAllocate(t *testing.T) { @@ -118,6 +121,58 @@ func TestAllocate(t *testing.T) { } } +func TestAllocateReserved(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + pr, err := net.ParsePortRange("30000-30128") + if err != nil { + t.Fatal(err) + } + + r, err := NewInMemory(*pr) + if err != nil { + t.Fatal(err) + } + // allocate all ports on the dynamic block + // dynamic block size is min(max(16,128/32),128) = 16 + dynamicOffset := calculateRangeOffset(*pr) + dynamicBlockSize := pr.Size - dynamicOffset + for i := 0; i < dynamicBlockSize; i++ { + if _, err := r.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := dynamicOffset; i < pr.Size; i++ { + port := i + pr.Base + if !r.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + if f := r.Free(); f != dynamicOffset { + t.Errorf("expected %d free ports, got %d", dynamicOffset, f) + } + // allocate all ports on the static block + for i := 0; i < dynamicOffset; i++ { + port := i + pr.Base + if err := r.Allocate(port); err != nil { + t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err) + } + } + if f := r.Free(); f != 0 { + t.Errorf("expected free equal to 0 got: %d", f) + } + // release one port in the allocated block and another a new one randomly + if err := r.Release(30053); err != nil { + t.Fatalf("Unexpected error trying to release port 30053: %v", err) + } + if _, err := r.AllocateNext(); err != nil { + t.Error(err) + } + if f := r.Free(); f != 0 { + t.Errorf("expected free equal to 0 got: %d", f) + } +} + func TestForEach(t *testing.T) { pr, err := net.ParsePortRange("10000-10200") if err != nil { @@ -262,3 +317,358 @@ func TestNewFromSnapshot(t *testing.T) { } } } + +func Test_calculateRangeOffset(t *testing.T) { + type args struct { + pr net.PortRange + } + tests := []struct { + name string + args args + want int + }{ + { + name: "default node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 2768, + }, + }, + want: 86, + }, + { + name: "very small node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 10, + }, + }, + want: 0, + }, + { + name: "small node port range (lower boundary)", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 16, + }, + }, + want: 0, + }, + { + name: "small node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 128, + }, + }, + want: 16, + }, + { + name: "medium node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 2048, + }, + }, + want: 64, + }, + { + name: "large node port range (upper boundary)", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 4096, + }, + }, + want: 128, + }, + { + name: "large node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 8192, + }, + }, + want: 128, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := calculateRangeOffset(tt.args.pr); got != tt.want { + t.Errorf("calculateRangeOffset() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNodePortMetrics(t *testing.T) { + clearMetrics() + // create node port allocator + portRange := "30000-32767" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + // Check initial state + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate 2 ports + found := sets.NewInt() + for i := 0; i < 2; i++ { + port, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + if found.Has(port) { + t.Fatalf("already reserved: %d", port) + } + found.Insert(port) + } + + em = testMetrics{ + free: 2768 - 2, + used: 2, + allocated: 2, + errors: 0, + } + expectMetrics(t, em) + + // try to allocate the same ports + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Allocate(s); err != ErrAllocated { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2768 - 2, + used: 2, + allocated: 2, + errors: 2, + } + expectMetrics(t, em) + + // release the ports allocated + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Release(s); err != nil { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2768, + used: 0, + allocated: 2, + errors: 2, + } + expectMetrics(t, em) + + // allocate 3000 ports for each allocator + // the full range and 232 more (2768 + 232 = 3000) + for i := 0; i < 3000; i++ { + a.AllocateNext() + } + em = testMetrics{ + free: 0, + used: 2768, + allocated: 2768 + 2, // this is a counter, we already had 2 allocations and we did 2768 more + errors: 232 + 2, // this is a counter, we already had 2 errors and we did 232 more + } + expectMetrics(t, em) +} + +func TestNodePortAllocatedMetrics(t *testing.T) { + clearMetrics() + + // create NodePort allocator + portRange := "30000-32767" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate 2 dynamic port + found := sets.NewInt() + for i := 0; i < 2; i++ { + port, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + if found.Has(port) { + t.Fatalf("already reserved: %d", port) + } + found.Insert(port) + } + + dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + if dynamicAllocated != 2 { + t.Fatalf("Expected 2 received %f", dynamicAllocated) + } + + // try to allocate the same ports + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Allocate(s); err != ErrAllocated { + t.Fatal(err) + } + } + + staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + if staticErrors != 2 { + t.Fatalf("Expected 2 received %f", staticErrors) + } +} + +func TestMetricsDisabled(t *testing.T) { + clearMetrics() + + // create NodePort allocator + portRange := "30000-32766" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + // create metrics disabled allocator with same port range + // this metrics should be ignored + b, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + + // Check initial state + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate in metrics enabled allocator + for i := 0; i < 100; i++ { + _, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2767 - 100, + used: 100, + allocated: 100, + errors: 0, + } + expectMetrics(t, em) + + // allocate in metrics disabled allocator + for i := 0; i < 200; i++ { + _, err := b.AllocateNext() + if err != nil { + t.Fatal(err) + } + } + // the metrics should not be changed + expectMetrics(t, em) +} + +// Metrics helpers +func clearMetrics() { + nodePortAllocated.Set(0) + nodePortAvailable.Set(0) + nodePortAllocations.Reset() + nodePortAllocationErrors.Reset() +} + +type testMetrics struct { + free float64 + used float64 + allocated float64 + errors float64 +} + +func expectMetrics(t *testing.T, em testMetrics) { + var m testMetrics + var err error + m.free, err = testutil.GetGaugeMetricValue(nodePortAvailable) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAvailable.Name, err) + } + m.used, err = testutil.GetGaugeMetricValue(nodePortAllocated) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocated.Name, err) + } + staticAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + dynamicErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + + m.allocated = staticAllocated + dynamicAllocated + m.errors = staticErrors + dynamicErrors + + if m != em { + t.Fatalf("metrics error: expected %v, received %v", em, m) + } +} diff --git a/pkg/registry/core/service/portallocator/metrics.go b/pkg/registry/core/service/portallocator/metrics.go new file mode 100644 index 00000000000..b2efa4ce1bb --- /dev/null +++ b/pkg/registry/core/service/portallocator/metrics.go @@ -0,0 +1,120 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portallocator + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "kube_apiserver" + subsystem = "nodeport_allocator" +) + +var ( + // nodePortAllocated indicates the amount of ports allocated by NodePort Service. + nodePortAllocated = metrics.NewGauge( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocated_ports", + Help: "Gauge measuring the number of allocated NodePorts for Services", + StabilityLevel: metrics.ALPHA, + }, + ) + // nodePortAvailable indicates the amount of ports available by NodePort Service. + nodePortAvailable = metrics.NewGauge( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "available_ports", + Help: "Gauge measuring the number of available NodePorts for Services", + StabilityLevel: metrics.ALPHA, + }, + ) + // nodePortAllocation counts the total number of ports allocation and allocation mode: static or dynamic. + nodePortAllocations = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocation_total", + Help: "Number of NodePort allocations", + StabilityLevel: metrics.ALPHA, + }, + []string{"scope"}, + ) + // nodePortAllocationErrors counts the number of error trying to allocate a nodePort and allocation mode: static or dynamic. + nodePortAllocationErrors = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocation_errors_total", + Help: "Number of errors trying to allocate NodePort", + StabilityLevel: metrics.ALPHA, + }, + []string{"scope"}, + ) +) + +var registerMetricsOnce sync.Once + +func registerMetrics() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(nodePortAllocated) + legacyregistry.MustRegister(nodePortAvailable) + legacyregistry.MustRegister(nodePortAllocations) + legacyregistry.MustRegister(nodePortAllocationErrors) + }) +} + +// metricsRecorderInterface is the interface to record metrics. +type metricsRecorderInterface interface { + setAllocated(allocated int) + setAvailable(available int) + incrementAllocations(scope string) + incrementAllocationErrors(scope string) +} + +// metricsRecorder implements metricsRecorderInterface. +type metricsRecorder struct{} + +func (m *metricsRecorder) setAllocated(allocated int) { + nodePortAllocated.Set(float64(allocated)) +} + +func (m *metricsRecorder) setAvailable(available int) { + nodePortAvailable.Set(float64(available)) +} + +func (m *metricsRecorder) incrementAllocations(scope string) { + nodePortAllocations.WithLabelValues(scope).Inc() +} + +func (m *metricsRecorder) incrementAllocationErrors(scope string) { + nodePortAllocationErrors.WithLabelValues(scope).Inc() +} + +// emptyMetricsRecorder is a null object implements metricsRecorderInterface. +type emptyMetricsRecorder struct{} + +func (*emptyMetricsRecorder) setAllocated(allocated int) {} +func (*emptyMetricsRecorder) setAvailable(available int) {} +func (*emptyMetricsRecorder) incrementAllocations(scope string) {} +func (*emptyMetricsRecorder) incrementAllocationErrors(scope string) {} diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go index 563f6f429ec..6bf30bbeee2 100644 --- a/pkg/registry/core/service/portallocator/storage/storage_test.go +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -27,8 +27,11 @@ import ( "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend/factory" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" api "k8s.io/kubernetes/pkg/apis/core" _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/core/service/allocator" allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" @@ -46,8 +49,8 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange} configForAllocations := etcdStorage.ForResource(api.Resource("servicenodeportallocations")) var backing allocator.Interface - storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { - mem := allocator.NewAllocationMap(max, rangeSpec) + storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) backing = mem etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", configForAllocations) if err != nil { @@ -183,3 +186,79 @@ func TestReallocate(t *testing.T) { } } + +func TestAllocateReserved(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // allocate all ports on the dynamic block + // default range size is 2768, and dynamic block size is min(max(16,2768/32),128) = 86 + dynamicOffset := 86 + dynamicBlockSize := sizePortRange - dynamicOffset + for i := 0; i < dynamicBlockSize; i++ { + if _, err := storage.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := dynamicOffset; i < sizePortRange; i++ { + port := i + basePortRange + if !storage.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + + // allocate all ports on the static block + for i := 0; i < dynamicOffset; i++ { + port := i + basePortRange + if err := storage.Allocate(port); err != nil { + t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err) + } + } + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } + // release one port in the allocated block and another a new one randomly + if err := storage.Release(basePortRange + 53); err != nil { + t.Fatalf("Unexpected error trying to release port 30053: %v", err) + } + if _, err := storage.AllocateNext(); err != nil { + t.Error(err) + } + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } +} + +func TestAllocateReservedDynamicBlockExhausted(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // allocate all ports both on the dynamic and reserved blocks + // once the dynamic block has been exhausted + // the dynamic allocator will use the reserved block + for i := 0; i < sizePortRange; i++ { + if _, err := storage.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := 0; i < sizePortRange; i++ { + port := i + basePortRange + if !storage.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } +}