diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index eb1e324c401..d80298ae9f1 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -783,6 +783,13 @@ const ( // Subdivide the ClusterIP range for dynamic and static IP allocation. ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange" + // owner: @xuzhenglun + // kep: http://kep.k8s.io/3682 + // alpha: v1.27 + // + // Subdivide the NodePort range for dynamic and static port allocation. + ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange" + // owner: @derekwaynecarr // alpha: v1.20 // beta: v1.22 @@ -1123,6 +1130,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS ServiceInternalTrafficPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 + ServiceNodePortStaticSubrange: {Default: false, PreRelease: featuregate.Alpha}, + SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta}, StatefulSetAutoDeletePVC: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 206e87e8909..1e528c074dc 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -237,8 +237,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource } var serviceNodePortRegistry rangeallocation.RangeRegistry - serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { - mem := allocator.NewAllocationMap(max, rangeSpec) + serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) // TODO etcdallocator package to return a storage interface via the storageFactory etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations"))) if err != nil { diff --git a/pkg/registry/core/service/portallocator/allocator.go b/pkg/registry/core/service/portallocator/allocator.go index 4f6cb68caf4..1c640a2edd3 100644 --- a/pkg/registry/core/service/portallocator/allocator.go +++ b/pkg/registry/core/service/portallocator/allocator.go @@ -21,10 +21,11 @@ import ( "fmt" "k8s.io/apimachinery/pkg/util/net" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/registry/core/service/allocator" - + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/registry/core/service/allocator" ) // Interface manages the allocation of ports out of a range. Interface @@ -62,22 +63,32 @@ type PortAllocator struct { var _ Interface = &PortAllocator{} // New creates a PortAllocator over a net.PortRange, calling allocatorFactory to construct the backing store. -func New(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) (*PortAllocator, error) { +func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory) (*PortAllocator, error) { max := pr.Size rangeSpec := pr.String() a := &PortAllocator{ portRange: pr, } + + var offset = 0 + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceNodePortStaticSubrange) { + offset = calculateRangeOffset(pr) + } + var err error - a.alloc, err = allocatorFactory(max, rangeSpec) + a.alloc, err = allocatorFactory(max, rangeSpec, offset) + if err != nil { + return nil, err + } + return a, err } // NewInMemory creates an in-memory allocator. func NewInMemory(pr net.PortRange) (*PortAllocator, error) { - return New(pr, func(max int, rangeSpec string) (allocator.Interface, error) { - return allocator.NewAllocationMap(max, rangeSpec), nil + return New(pr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + return allocator.NewAllocationMapWithOffset(max, rangeSpec, offset), nil }) } @@ -213,3 +224,31 @@ func (r *PortAllocator) contains(port int) (bool, int) { func (r *PortAllocator) Destroy() { r.alloc.Destroy() } + +// calculateRangeOffset estimates the offset used on the range for statically allocation based on +// the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than +// $min or more than $max, with a graduated step function between them~. The function returns 0 +// if any of the parameters is invalid. +func calculateRangeOffset(pr net.PortRange) int { + // default values for min(max($min, rangeSize/$step), $max) + const ( + min = 16 + max = 128 + step = 32 + ) + + rangeSize := pr.Size + // offset should always be smaller than the range size + if rangeSize <= min { + return 0 + } + + offset := rangeSize / step + if offset < min { + return min + } + if offset > max { + return max + } + return int(offset) +} diff --git a/pkg/registry/core/service/portallocator/allocator_test.go b/pkg/registry/core/service/portallocator/allocator_test.go index 44b76dfbd66..b78b878640e 100644 --- a/pkg/registry/core/service/portallocator/allocator_test.go +++ b/pkg/registry/core/service/portallocator/allocator_test.go @@ -17,13 +17,15 @@ limitations under the License. package portallocator import ( - "testing" - "strconv" + "testing" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" ) func TestAllocate(t *testing.T) { @@ -118,6 +120,58 @@ func TestAllocate(t *testing.T) { } } +func TestAllocateReserved(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + pr, err := net.ParsePortRange("30000-30128") + if err != nil { + t.Fatal(err) + } + + r, err := NewInMemory(*pr) + if err != nil { + t.Fatal(err) + } + // allocate all ports on the dynamic block + // dynamic block size is min(max(16,128/32),128) = 16 + dynamicOffset := calculateRangeOffset(*pr) + dynamicBlockSize := pr.Size - dynamicOffset + for i := 0; i < dynamicBlockSize; i++ { + if _, err := r.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := dynamicOffset; i < pr.Size; i++ { + port := i + pr.Base + if !r.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + if f := r.Free(); f != dynamicOffset { + t.Errorf("expected %d free ports, got %d", dynamicOffset, f) + } + // allocate all ports on the static block + for i := 0; i < dynamicOffset; i++ { + port := i + pr.Base + if err := r.Allocate(port); err != nil { + t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err) + } + } + if f := r.Free(); f != 0 { + t.Errorf("expected free equal to 0 got: %d", f) + } + // release one port in the allocated block and another a new one randomly + if err := r.Release(30053); err != nil { + t.Fatalf("Unexpected error trying to release port 30053: %v", err) + } + if _, err := r.AllocateNext(); err != nil { + t.Error(err) + } + if f := r.Free(); f != 0 { + t.Errorf("expected free equal to 0 got: %d", f) + } +} + func TestForEach(t *testing.T) { pr, err := net.ParsePortRange("10000-10200") if err != nil { @@ -262,3 +316,92 @@ func TestNewFromSnapshot(t *testing.T) { } } } + +func Test_calculateRangeOffset(t *testing.T) { + type args struct { + pr net.PortRange + } + tests := []struct { + name string + args args + want int + }{ + { + name: "default node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 2768, + }, + }, + want: 86, + }, + { + name: "very small node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 10, + }, + }, + want: 0, + }, + { + name: "small node port range (lower boundary)", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 16, + }, + }, + want: 0, + }, + { + name: "small node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 128, + }, + }, + want: 16, + }, + { + name: "medium node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 2048, + }, + }, + want: 64, + }, + { + name: "large node port range (upper boundary)", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 4096, + }, + }, + want: 128, + }, + { + name: "large node port range", + args: args{ + pr: net.PortRange{ + Base: 30000, + Size: 8192, + }, + }, + want: 128, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := calculateRangeOffset(tt.args.pr); got != tt.want { + t.Errorf("calculateRangeOffset() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go index 563f6f429ec..b9a417f255f 100644 --- a/pkg/registry/core/service/portallocator/storage/storage_test.go +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -27,8 +27,11 @@ import ( "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend/factory" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" api "k8s.io/kubernetes/pkg/apis/core" _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/core/service/allocator" allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" @@ -46,8 +49,8 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange} configForAllocations := etcdStorage.ForResource(api.Resource("servicenodeportallocations")) var backing allocator.Interface - storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { - mem := allocator.NewAllocationMap(max, rangeSpec) + storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) backing = mem etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", configForAllocations) if err != nil { @@ -183,3 +186,79 @@ func TestReallocate(t *testing.T) { } } + +func TestAllocateReserved(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // allocate all ports on the dynamic block + // default range size is 2768, and dynamic block size is min(max(16,2768/32),128) = 86 + dynamicOffset := 86 + dynamicBlockSize := sizePortRange - dynamicOffset + for i := 0; i < dynamicBlockSize; i++ { + if _, err := storage.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := dynamicOffset; i < sizePortRange; i++ { + port := i + basePortRange + if !storage.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + + // allocate all ports on the static block + for i := 0; i < dynamicOffset; i++ { + port := i + basePortRange + if err := storage.Allocate(port); err != nil { + t.Errorf("Unexpected error trying to allocate IP %d: %v", port, err) + } + } + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } + // release one port in the allocated block and another a new one randomly + if err := storage.Release(basePortRange + 53); err != nil { + t.Fatalf("Unexpected error trying to release ip 30053: %v", err) + } + if _, err := storage.AllocateNext(); err != nil { + t.Error(err) + } + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } +} + +func TestAllocateReservedDynamicBlockExhausted(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)() + + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // allocate all ports both on the dynamic and reserved blocks + // once the dynamic block has been exhausted + // the dynamic allocator will use the reserved block + for i := 0; i < sizePortRange; i++ { + if _, err := storage.AllocateNext(); err != nil { + t.Errorf("Unexpected error trying to allocate: %v", err) + } + } + for i := 0; i < sizePortRange; i++ { + port := i + basePortRange + if !storage.Has(port) { + t.Errorf("Port %d expected to be allocated", port) + } + } + + if _, err := storage.AllocateNext(); err == nil { + t.Error("Allocator expected to be full") + } +}