allocate nodeport with offset

This commit is contained in:
xuzhenglun 2022-12-12 13:34:40 +08:00
parent 23c65ec590
commit c18c6e1b87
No known key found for this signature in database
GPG Key ID: 43BD51D11716234A
5 changed files with 283 additions and 13 deletions

View File

@ -783,6 +783,13 @@ const (
// Subdivide the ClusterIP range for dynamic and static IP allocation.
ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange"
// owner: @xuzhenglun
// kep: http://kep.k8s.io/3682
// alpha: v1.27
//
// Subdivide the NodePort range for dynamic and static port allocation.
ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange"
// owner: @derekwaynecarr
// alpha: v1.20
// beta: v1.22
@ -1123,6 +1130,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ServiceInternalTrafficPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
ServiceNodePortStaticSubrange: {Default: false, PreRelease: featuregate.Alpha},
SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta},
StatefulSetAutoDeletePVC: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -237,8 +237,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
}
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
if err != nil {

View File

@ -21,10 +21,11 @@ import (
"fmt"
"k8s.io/apimachinery/pkg/util/net"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
)
// Interface manages the allocation of ports out of a range. Interface
@ -62,22 +63,32 @@ type PortAllocator struct {
var _ Interface = &PortAllocator{}
// New creates a PortAllocator over a net.PortRange, calling allocatorFactory to construct the backing store.
func New(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) (*PortAllocator, error) {
func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory) (*PortAllocator, error) {
max := pr.Size
rangeSpec := pr.String()
a := &PortAllocator{
portRange: pr,
}
var offset = 0
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceNodePortStaticSubrange) {
offset = calculateRangeOffset(pr)
}
var err error
a.alloc, err = allocatorFactory(max, rangeSpec)
a.alloc, err = allocatorFactory(max, rangeSpec, offset)
if err != nil {
return nil, err
}
return a, err
}
// NewInMemory creates an in-memory allocator.
func NewInMemory(pr net.PortRange) (*PortAllocator, error) {
return New(pr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewAllocationMap(max, rangeSpec), nil
return New(pr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
return allocator.NewAllocationMapWithOffset(max, rangeSpec, offset), nil
})
}
@ -213,3 +224,31 @@ func (r *PortAllocator) contains(port int) (bool, int) {
func (r *PortAllocator) Destroy() {
r.alloc.Destroy()
}
// calculateRangeOffset estimates the offset used on the range for statically allocation based on
// the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than
// $min or more than $max, with a graduated step function between them~. The function returns 0
// if any of the parameters is invalid.
func calculateRangeOffset(pr net.PortRange) int {
// default values for min(max($min, rangeSize/$step), $max)
const (
min = 16
max = 128
step = 32
)
rangeSize := pr.Size
// offset should always be smaller than the range size
if rangeSize <= min {
return 0
}
offset := rangeSize / step
if offset < min {
return min
}
if offset > max {
return max
}
return int(offset)
}

View File

@ -17,13 +17,15 @@ limitations under the License.
package portallocator
import (
"testing"
"strconv"
"testing"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
)
func TestAllocate(t *testing.T) {
@ -118,6 +120,58 @@ func TestAllocate(t *testing.T) {
}
}
func TestAllocateReserved(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
pr, err := net.ParsePortRange("30000-30128")
if err != nil {
t.Fatal(err)
}
r, err := NewInMemory(*pr)
if err != nil {
t.Fatal(err)
}
// allocate all ports on the dynamic block
// dynamic block size is min(max(16,128/32),128) = 16
dynamicOffset := calculateRangeOffset(*pr)
dynamicBlockSize := pr.Size - dynamicOffset
for i := 0; i < dynamicBlockSize; i++ {
if _, err := r.AllocateNext(); err != nil {
t.Errorf("Unexpected error trying to allocate: %v", err)
}
}
for i := dynamicOffset; i < pr.Size; i++ {
port := i + pr.Base
if !r.Has(port) {
t.Errorf("Port %d expected to be allocated", port)
}
}
if f := r.Free(); f != dynamicOffset {
t.Errorf("expected %d free ports, got %d", dynamicOffset, f)
}
// allocate all ports on the static block
for i := 0; i < dynamicOffset; i++ {
port := i + pr.Base
if err := r.Allocate(port); err != nil {
t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err)
}
}
if f := r.Free(); f != 0 {
t.Errorf("expected free equal to 0 got: %d", f)
}
// release one port in the allocated block and another a new one randomly
if err := r.Release(30053); err != nil {
t.Fatalf("Unexpected error trying to release port 30053: %v", err)
}
if _, err := r.AllocateNext(); err != nil {
t.Error(err)
}
if f := r.Free(); f != 0 {
t.Errorf("expected free equal to 0 got: %d", f)
}
}
func TestForEach(t *testing.T) {
pr, err := net.ParsePortRange("10000-10200")
if err != nil {
@ -262,3 +316,92 @@ func TestNewFromSnapshot(t *testing.T) {
}
}
}
func Test_calculateRangeOffset(t *testing.T) {
type args struct {
pr net.PortRange
}
tests := []struct {
name string
args args
want int
}{
{
name: "default node port range",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 2768,
},
},
want: 86,
},
{
name: "very small node port range",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 10,
},
},
want: 0,
},
{
name: "small node port range (lower boundary)",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 16,
},
},
want: 0,
},
{
name: "small node port range",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 128,
},
},
want: 16,
},
{
name: "medium node port range",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 2048,
},
},
want: 64,
},
{
name: "large node port range (upper boundary)",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 4096,
},
},
want: 128,
},
{
name: "large node port range",
args: args{
pr: net.PortRange{
Base: 30000,
Size: 8192,
},
},
want: 128,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := calculateRangeOffset(tt.args.pr); got != tt.want {
t.Errorf("calculateRangeOffset() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -27,8 +27,11 @@ import (
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
@ -46,8 +49,8 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange}
configForAllocations := etcdStorage.ForResource(api.Resource("servicenodeportallocations"))
var backing allocator.Interface
storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
backing = mem
etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", configForAllocations)
if err != nil {
@ -183,3 +186,79 @@ func TestReallocate(t *testing.T) {
}
}
func TestAllocateReserved(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
_, storage, _, si, destroyFunc := newStorage(t)
defer destroyFunc()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// allocate all ports on the dynamic block
// default range size is 2768, and dynamic block size is min(max(16,2768/32),128) = 86
dynamicOffset := 86
dynamicBlockSize := sizePortRange - dynamicOffset
for i := 0; i < dynamicBlockSize; i++ {
if _, err := storage.AllocateNext(); err != nil {
t.Errorf("Unexpected error trying to allocate: %v", err)
}
}
for i := dynamicOffset; i < sizePortRange; i++ {
port := i + basePortRange
if !storage.Has(port) {
t.Errorf("Port %d expected to be allocated", port)
}
}
// allocate all ports on the static block
for i := 0; i < dynamicOffset; i++ {
port := i + basePortRange
if err := storage.Allocate(port); err != nil {
t.Errorf("Unexpected error trying to allocate IP %d: %v", port, err)
}
}
if _, err := storage.AllocateNext(); err == nil {
t.Error("Allocator expected to be full")
}
// release one port in the allocated block and another a new one randomly
if err := storage.Release(basePortRange + 53); err != nil {
t.Fatalf("Unexpected error trying to release ip 30053: %v", err)
}
if _, err := storage.AllocateNext(); err != nil {
t.Error(err)
}
if _, err := storage.AllocateNext(); err == nil {
t.Error("Allocator expected to be full")
}
}
func TestAllocateReservedDynamicBlockExhausted(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
_, storage, _, si, destroyFunc := newStorage(t)
defer destroyFunc()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// allocate all ports both on the dynamic and reserved blocks
// once the dynamic block has been exhausted
// the dynamic allocator will use the reserved block
for i := 0; i < sizePortRange; i++ {
if _, err := storage.AllocateNext(); err != nil {
t.Errorf("Unexpected error trying to allocate: %v", err)
}
}
for i := 0; i < sizePortRange; i++ {
port := i + basePortRange
if !storage.Has(port) {
t.Errorf("Port %d expected to be allocated", port)
}
}
if _, err := storage.AllocateNext(); err == nil {
t.Error("Allocator expected to be full")
}
}