feature gate for service IP allocation prioritized

Add feature gate ServiceIPStaticSubrange to enable a new strategy
in the Service IP allocators, so the IP range is is subdivided and
dynamic allocated addresses are allocated preferently from the
upper range.
This commit is contained in:
Antonio Ojea 2022-01-11 09:31:33 +01:00
parent 96d71f01eb
commit ec0881a920
6 changed files with 337 additions and 36 deletions

View File

@ -837,6 +837,12 @@ const (
//
// Enable MinDomains in Pod Topology Spread.
MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread"
// owner: @aojea
// kep: http://kep.k8s.io/3070
// alpha: v1.24
//
// Subdivide the ClusterIP range for dynamic and static IP allocation.
ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange"
)
func init() {
@ -959,6 +965,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
GRPCContainerProbe: {Default: false, PreRelease: featuregate.Alpha},
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta},
MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Alpha},
ServiceIPStaticSubrange: {Default: false, PreRelease: featuregate.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -199,8 +199,9 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceClusterIPAllocator, err := ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
serviceClusterIPAllocator, err := ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
if err != nil {
@ -218,8 +219,9 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
var secondaryServiceClusterIPAllocator ipallocator.Interface
if c.SecondaryServiceIPRange.IP != nil {
var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
secondaryServiceClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
secondaryServiceClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
if err != nil {

View File

@ -22,7 +22,9 @@ import (
"math/big"
"net"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
netutils "k8s.io/utils/net"
)
@ -86,7 +88,7 @@ type Range struct {
}
// New creates a Range over a net.IPNet, calling allocatorFactory to construct the backing store.
func New(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) (*Range, error) {
func New(cidr *net.IPNet, allocatorFactory allocator.AllocatorWithOffsetFactory) (*Range, error) {
registerMetrics()
max := netutils.RangeSize(cidr)
@ -118,16 +120,24 @@ func New(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) (*Range,
max: maximum(0, int(max)),
family: family,
}
var err error
r.alloc, err = allocatorFactory(r.max, rangeSpec)
return &r, err
offset := 0
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceIPStaticSubrange) {
offset = calculateRangeOffset(cidr)
}
var err error
r.alloc, err = allocatorFactory(r.max, rangeSpec, offset)
if err != nil {
return nil, err
}
return &r, nil
}
// NewInMemory creates an in-memory allocator.
func NewInMemory(cidr *net.IPNet) (*Range, error) {
return New(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewAllocationMap(max, rangeSpec), nil
return New(cidr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
return allocator.NewAllocationMapWithOffset(max, rangeSpec, offset), nil
})
}
@ -191,7 +201,7 @@ func (r *Range) allocate(ip net.IP, dryRun bool) error {
ok, offset := r.contains(ip)
if !ok {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
clusterIPAllocationErrors.WithLabelValues(label.String(), "static").Inc()
return &ErrNotInRange{ip, r.net.String()}
}
if dryRun {
@ -203,18 +213,18 @@ func (r *Range) allocate(ip net.IP, dryRun bool) error {
allocated, err := r.alloc.Allocate(offset)
if err != nil {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
clusterIPAllocationErrors.WithLabelValues(label.String(), "static").Inc()
return err
}
if !allocated {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
clusterIPAllocationErrors.WithLabelValues(label.String(), "static").Inc()
return ErrAllocated
}
// update metrics
clusterIPAllocations.WithLabelValues(label.String()).Inc()
clusterIPAllocations.WithLabelValues(label.String(), "static").Inc()
clusterIPAllocated.WithLabelValues(label.String()).Set(float64(r.Used()))
clusterIPAvailable.WithLabelValues(label.String()).Set(float64(r.Free()))
@ -238,18 +248,18 @@ func (r *Range) allocateNext(dryRun bool) (net.IP, error) {
offset, ok, err := r.alloc.AllocateNext()
if err != nil {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
clusterIPAllocationErrors.WithLabelValues(label.String(), "dynamic").Inc()
return nil, err
}
if !ok {
// update metrics
clusterIPAllocationErrors.WithLabelValues(label.String()).Inc()
clusterIPAllocationErrors.WithLabelValues(label.String(), "dynamic").Inc()
return nil, ErrFull
}
// update metrics
clusterIPAllocations.WithLabelValues(label.String()).Inc()
clusterIPAllocations.WithLabelValues(label.String(), "dynamic").Inc()
clusterIPAllocated.WithLabelValues(label.String()).Set(float64(r.Used()))
clusterIPAvailable.WithLabelValues(label.String()).Set(float64(r.Free()))
@ -354,6 +364,33 @@ func calculateIPOffset(base *big.Int, ip net.IP) int {
return int(big.NewInt(0).Sub(netutils.BigForIP(ip), base).Int64())
}
// calculateRangeOffset estimates the offset used on the range for statically allocation based on
// the following formula `min(max($min, cidrSize/$step), $max)`, described as ~never less than
// $min or more than $max, with a graduated step function between them~. The function returns 0
// if any of the parameters is invalid.
func calculateRangeOffset(cidr *net.IPNet) int {
// default values for min(max($min, cidrSize/$step), $max)
const (
min = 16
max = 256
step = 16
)
cidrSize := netutils.RangeSize(cidr)
if cidrSize < min {
return 0
}
offset := cidrSize / step
if offset < min {
return min
}
if offset > max {
return max
}
return int(offset)
}
// dryRunRange is a shim to satisfy Interface without persisting state.
type dryRunRange struct {
real *Range

View File

@ -17,12 +17,16 @@ limitations under the License.
package ipallocator
import (
"fmt"
"net"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
netutils "k8s.io/utils/net"
)
@ -176,6 +180,57 @@ func TestAllocateTiny(t *testing.T) {
}
}
func TestAllocateReserved(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceIPStaticSubrange, true)()
_, cidr, err := netutils.ParseCIDRSloppy("192.168.1.0/25")
if err != nil {
t.Fatal(err)
}
r, err := NewInMemory(cidr)
if err != nil {
t.Fatal(err)
}
// allocate all addresses on the dynamic block
// subnet /25 = 128 ; dynamic block size is min(max(16,128/16),256) = 16
dynamicOffset := calculateRangeOffset(cidr)
dynamicBlockSize := r.max - dynamicOffset
for i := 0; i < dynamicBlockSize; i++ {
if _, err := r.AllocateNext(); err != nil {
t.Errorf("Unexpected error trying to allocate: %v", err)
}
}
for i := dynamicOffset; i < r.max; i++ {
ip := fmt.Sprintf("192.168.1.%d", i+1)
if !r.Has(netutils.ParseIPSloppy(ip)) {
t.Errorf("IP %s expected to be allocated", ip)
}
}
if f := r.Free(); f != dynamicOffset {
t.Errorf("expected %d free addresses, got %d", dynamicOffset, f)
}
// allocate all addresses on the static block
for i := 0; i < dynamicOffset; i++ {
ip := fmt.Sprintf("192.168.1.%d", i+1)
if err := r.Allocate(netutils.ParseIPSloppy(ip)); err != nil {
t.Errorf("Unexpected error trying to allocate IP %s: %v", ip, err)
}
}
if f := r.Free(); f != 0 {
t.Errorf("expected free equal to 0 got: %d", f)
}
// release one address in the allocated block and another a new one randomly
if err := r.Release(netutils.ParseIPSloppy("192.168.1.10")); err != nil {
t.Fatalf("Unexpected error trying to release ip 192.168.1.10: %v", err)
}
if _, err := r.AllocateNext(); err != nil {
t.Error(err)
}
if f := r.Free(); f != 0 {
t.Errorf("expected free equal to 0 got: %d", f)
}
}
func TestAllocateSmall(t *testing.T) {
_, cidr, err := netutils.ParseCIDRSloppy("192.168.1.240/30")
if err != nil {
@ -186,7 +241,7 @@ func TestAllocateSmall(t *testing.T) {
t.Fatal(err)
}
if f := r.Free(); f != 2 {
t.Errorf("free: %d", f)
t.Errorf("expected free equal to 2 got: %d", f)
}
found := sets.NewString()
for i := 0; i < 2; i++ {
@ -195,7 +250,7 @@ func TestAllocateSmall(t *testing.T) {
t.Fatal(err)
}
if found.Has(ip.String()) {
t.Fatalf("already reserved: %s", ip)
t.Fatalf("address %s has been already allocated", ip)
}
found.Insert(ip.String())
}
@ -213,8 +268,12 @@ func TestAllocateSmall(t *testing.T) {
}
}
if r.Free() != 0 && r.max != 2 {
t.Fatalf("unexpected range: %v", r)
if f := r.Free(); f != 0 {
t.Errorf("expected free equal to 0 got: %d", f)
}
if r.max != 2 {
t.Fatalf("expected range equal to 2, got: %v", r)
}
t.Logf("allocated: %v", found)
@ -365,6 +424,9 @@ func TestNewFromSnapshot(t *testing.T) {
}
func TestClusterIPMetrics(t *testing.T) {
clearMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceIPStaticSubrange, true)()
// create IPv4 allocator
cidrIPv4 := "10.0.0.0/24"
_, clusterCIDRv4, _ := netutils.ParseCIDRSloppy(cidrIPv4)
@ -372,7 +434,6 @@ func TestClusterIPMetrics(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error creating CidrSet: %v", err)
}
clearMetrics(map[string]string{"cidr": cidrIPv4})
// create IPv6 allocator
cidrIPv6 := "2001:db8::/112"
_, clusterCIDRv6, _ := netutils.ParseCIDRSloppy(cidrIPv6)
@ -380,7 +441,6 @@ func TestClusterIPMetrics(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error creating CidrSet: %v", err)
}
clearMetrics(map[string]string{"cidr": cidrIPv6})
// Check initial state
em := testMetrics{
@ -475,12 +535,72 @@ func TestClusterIPMetrics(t *testing.T) {
expectMetrics(t, cidrIPv6, em)
}
func TestClusterIPAllocatedMetrics(t *testing.T) {
clearMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceIPStaticSubrange, true)()
// create IPv4 allocator
cidrIPv4 := "10.0.0.0/25"
_, clusterCIDRv4, _ := netutils.ParseCIDRSloppy(cidrIPv4)
a, err := NewInMemory(clusterCIDRv4)
if err != nil {
t.Fatalf("unexpected error creating CidrSet: %v", err)
}
em := testMetrics{
free: 0,
used: 0,
allocated: 0,
errors: 0,
}
expectMetrics(t, cidrIPv4, em)
// allocate 2 dynamic IPv4 addresses
found := sets.NewString()
for i := 0; i < 2; i++ {
ip, err := a.AllocateNext()
if err != nil {
t.Fatal(err)
}
if found.Has(ip.String()) {
t.Fatalf("already reserved: %s", ip)
}
found.Insert(ip.String())
}
dynamic_allocated, err := testutil.GetCounterMetricValue(clusterIPAllocations.WithLabelValues(cidrIPv4, "dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocations.Name, err)
}
if dynamic_allocated != 2 {
t.Fatalf("Expected 2 received %f", dynamic_allocated)
}
// try to allocate the same IP addresses
for s := range found {
if !a.Has(netutils.ParseIPSloppy(s)) {
t.Fatalf("missing: %s", s)
}
if err := a.Allocate(netutils.ParseIPSloppy(s)); err != ErrAllocated {
t.Fatal(err)
}
}
static_errors, err := testutil.GetCounterMetricValue(clusterIPAllocationErrors.WithLabelValues(cidrIPv4, "static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocationErrors.Name, err)
}
if static_errors != 2 {
t.Fatalf("Expected 2 received %f", dynamic_allocated)
}
}
// Metrics helpers
func clearMetrics(labels map[string]string) {
clusterIPAllocated.Delete(labels)
clusterIPAvailable.Delete(labels)
clusterIPAllocations.Delete(labels)
clusterIPAllocationErrors.Delete(labels)
func clearMetrics() {
clusterIPAllocated.Reset()
clusterIPAvailable.Reset()
clusterIPAllocations.Reset()
clusterIPAllocationErrors.Reset()
}
type testMetrics struct {
@ -501,14 +621,25 @@ func expectMetrics(t *testing.T, label string, em testMetrics) {
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocated.Name, err)
}
m.allocated, err = testutil.GetCounterMetricValue(clusterIPAllocations.WithLabelValues(label))
static_allocated, err := testutil.GetCounterMetricValue(clusterIPAllocations.WithLabelValues(label, "static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocations.Name, err)
}
m.errors, err = testutil.GetCounterMetricValue(clusterIPAllocationErrors.WithLabelValues(label))
static_errors, err := testutil.GetCounterMetricValue(clusterIPAllocationErrors.WithLabelValues(label, "static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocationErrors.Name, err)
}
dynamic_allocated, err := testutil.GetCounterMetricValue(clusterIPAllocations.WithLabelValues(label, "dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocations.Name, err)
}
dynamic_errors, err := testutil.GetCounterMetricValue(clusterIPAllocationErrors.WithLabelValues(label, "dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPAllocationErrors.Name, err)
}
m.allocated = static_allocated + dynamic_allocated
m.errors = static_errors + dynamic_errors
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
@ -578,3 +709,75 @@ func TestDryRun(t *testing.T) {
})
}
}
func Test_calculateRangeOffset(t *testing.T) {
// default $min = 16, $max = 256 and $step = 16.
tests := []struct {
name string
cidr string
want int
}{
{
name: "full mask IPv4",
cidr: "192.168.1.1/32",
want: 0,
},
{
name: "full mask IPv6",
cidr: "fd00::1/128",
want: 0,
},
{
name: "very small mask IPv4",
cidr: "192.168.1.1/30",
want: 0,
},
{
name: "very small mask IPv6",
cidr: "fd00::1/126",
want: 0,
},
{
name: "small mask IPv4",
cidr: "192.168.1.1/28",
want: 16,
},
{
name: "small mask IPv6",
cidr: "fd00::1/122",
want: 16,
},
{
name: "medium mask IPv4",
cidr: "192.168.1.1/22",
want: 64,
},
{
name: "medium mask IPv6",
cidr: "fd00::1/118",
want: 64,
},
{
name: "large mask IPv4",
cidr: "192.168.1.1/8",
want: 256,
},
{
name: "large mask IPv6",
cidr: "fd00::1/12",
want: 256,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, cidr, err := netutils.ParseCIDRSloppy(tt.cidr)
if err != nil {
t.Fatalf("Unexpected error parsing CIDR %s: %v", tt.cidr, err)
}
if got := calculateRangeOffset(cidr); got != tt.want {
t.Errorf("DynamicRangeOffset() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -51,7 +51,7 @@ var (
},
[]string{"cidr"},
)
// clusterIPAllocation counts the total number of ClusterIP allocation.
// clusterIPAllocation counts the total number of ClusterIP allocation and allocation mode: static or dynamic.
clusterIPAllocations = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
@ -60,9 +60,9 @@ var (
Help: "Number of Cluster IPs allocations",
StabilityLevel: metrics.ALPHA,
},
[]string{"cidr"},
[]string{"cidr", "scope"},
)
// clusterIPAllocationErrors counts the number of error trying to allocate a ClusterIP.
// clusterIPAllocationErrors counts the number of error trying to allocate a ClusterIP and allocation mode: static or dynamic.
clusterIPAllocationErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
@ -71,7 +71,7 @@ var (
Help: "Number of errors trying to allocate Cluster IPs",
StabilityLevel: metrics.ALPHA,
},
[]string{"cidr"},
[]string{"cidr", "scope"},
)
)

View File

@ -18,6 +18,7 @@ package storage
import (
"context"
"fmt"
"strings"
"testing"
@ -25,8 +26,11 @@ import (
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
api "k8s.io/kubernetes/pkg/apis/core"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
@ -43,8 +47,8 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
var backing allocator.Interface
configForAllocations := etcdStorage.ForResource(api.Resource("serviceipallocations"))
storage, err := ipallocator.New(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
mem := allocator.NewAllocationMap(max, rangeSpec)
storage, err := ipallocator.New(cidr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
backing = mem
etcd, err := allocatorstore.NewEtcd(mem, "/ranges/serviceips", configForAllocations)
if err != nil {
@ -115,3 +119,51 @@ func TestStore(t *testing.T) {
t.Fatal(err)
}
}
func TestAllocateReserved(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceIPStaticSubrange, true)()
_, storage, _, si, destroyFunc := newStorage(t)
defer destroyFunc()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// allocate all addresses on the dynamic block
// subnet /24 = 256 ; dynamic block size is min(max(16,256/16),256) = 16
dynamicOffset := 16
max := 254
dynamicBlockSize := max - dynamicOffset
for i := 0; i < dynamicBlockSize; i++ {
if _, err := storage.AllocateNext(); err != nil {
t.Errorf("Unexpected error trying to allocate: %v", err)
}
}
for i := dynamicOffset; i < max; i++ {
ip := fmt.Sprintf("192.168.1.%d", i+1)
if !storage.Has(netutils.ParseIPSloppy(ip)) {
t.Errorf("IP %s expected to be allocated", ip)
}
}
// allocate all addresses on the static block
for i := 0; i < dynamicOffset; i++ {
ip := fmt.Sprintf("192.168.1.%d", i+1)
if err := storage.Allocate(netutils.ParseIPSloppy(ip)); err != nil {
t.Errorf("Unexpected error trying to allocate IP %s: %v", ip, err)
}
}
if _, err := storage.AllocateNext(); err == nil {
t.Error("Allocator expected to be full")
}
// release one address in the allocated block and another a new one randomly
if err := storage.Release(netutils.ParseIPSloppy("192.168.1.10")); err != nil {
t.Fatalf("Unexpected error trying to release ip 192.168.1.10: %v", err)
}
if _, err := storage.AllocateNext(); err != nil {
t.Error(err)
}
if _, err := storage.AllocateNext(); err == nil {
t.Error("Allocator expected to be full")
}
}