add kube_apiserver_nodeport_allocator_* to improve observability of ServiceNodePortStaticSubrange

This commit is contained in:
xuzhenglun 2022-12-12 20:15:51 +08:00
parent c18c6e1b87
commit d48dd100bf
No known key found for this signature in database
GPG Key ID: 43BD51D11716234A
5 changed files with 429 additions and 3 deletions

View File

@ -250,6 +250,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
serviceNodePortAllocator.EnableMetrics()
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)

View File

@ -37,6 +37,7 @@ type Interface interface {
ForEach(func(int))
Has(int) bool
Destroy()
EnableMetrics()
}
var (
@ -57,6 +58,9 @@ type PortAllocator struct {
portRange net.PortRange
alloc allocator.Interface
// metrics is a metrics recorder that can be disabled
metrics metricsRecorderInterface
}
// PortAllocator implements Interface and Snapshottable
@ -69,6 +73,7 @@ func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory
a := &PortAllocator{
portRange: pr,
metrics: &emptyMetricsRecorder{},
}
var offset = 0
@ -125,6 +130,9 @@ func (r *PortAllocator) Used() int {
func (r *PortAllocator) Allocate(port int) error {
ok, offset := r.contains(port)
if !ok {
// update metrics
r.metrics.incrementAllocationErrors("static")
// include valid port range in error
validPorts := r.portRange.String()
return &ErrNotInRange{validPorts}
@ -132,11 +140,21 @@ func (r *PortAllocator) Allocate(port int) error {
allocated, err := r.alloc.Allocate(offset)
if err != nil {
// update metrics
r.metrics.incrementAllocationErrors("static")
return err
}
if !allocated {
// update metrics
r.metrics.incrementAllocationErrors("static")
return ErrAllocated
}
// update metrics
r.metrics.incrementAllocations("static")
r.metrics.setAllocated(r.Used())
r.metrics.setAvailable(r.Free())
return nil
}
@ -145,11 +163,19 @@ func (r *PortAllocator) Allocate(port int) error {
func (r *PortAllocator) AllocateNext() (int, error) {
offset, ok, err := r.alloc.AllocateNext()
if err != nil {
r.metrics.incrementAllocationErrors("dynamic")
return 0, err
}
if !ok {
r.metrics.incrementAllocationErrors("dynamic")
return 0, ErrFull
}
// update metrics
r.metrics.incrementAllocations("dynamic")
r.metrics.setAllocated(r.Used())
r.metrics.setAvailable(r.Free())
return r.portRange.Base + offset, nil
}
@ -170,7 +196,13 @@ func (r *PortAllocator) Release(port int) error {
return nil
}
return r.alloc.Release(offset)
err := r.alloc.Release(offset)
if err == nil {
// update metrics
r.metrics.setAllocated(r.Used())
r.metrics.setAvailable(r.Free())
}
return err
}
// Has returns true if the provided port is already allocated and a call
@ -225,6 +257,12 @@ func (r *PortAllocator) Destroy() {
r.alloc.Destroy()
}
// EnableMetrics enables metrics recording.
func (r *PortAllocator) EnableMetrics() {
registerMetrics()
r.metrics = &metricsRecorder{}
}
// calculateRangeOffset estimates the offset used on the range for statically allocation based on
// the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than
// $min or more than $max, with a graduated step function between them~. The function returns 0

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
)
@ -405,3 +406,269 @@ func Test_calculateRangeOffset(t *testing.T) {
})
}
}
func TestNodePortMetrics(t *testing.T) {
clearMetrics()
// create node port allocator
portRange := "30000-32767"
pr, err := net.ParsePortRange(portRange)
if err != nil {
t.Fatal(err)
}
a, err := NewInMemory(*pr)
if err != nil {
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
}
a.EnableMetrics()
// Check initial state
em := testMetrics{
free: 0,
used: 0,
allocated: 0,
errors: 0,
}
expectMetrics(t, em)
// allocate 2 ports
found := sets.NewInt()
for i := 0; i < 2; i++ {
port, err := a.AllocateNext()
if err != nil {
t.Fatal(err)
}
if found.Has(port) {
t.Fatalf("already reserved: %d", port)
}
found.Insert(port)
}
em = testMetrics{
free: 2768 - 2,
used: 2,
allocated: 2,
errors: 0,
}
expectMetrics(t, em)
// try to allocate the same ports
for s := range found {
if !a.Has(s) {
t.Fatalf("missing: %d", s)
}
if err := a.Allocate(s); err != ErrAllocated {
t.Fatal(err)
}
}
em = testMetrics{
free: 2768 - 2,
used: 2,
allocated: 2,
errors: 2,
}
expectMetrics(t, em)
// release the ports allocated
for s := range found {
if !a.Has(s) {
t.Fatalf("missing: %d", s)
}
if err := a.Release(s); err != nil {
t.Fatal(err)
}
}
em = testMetrics{
free: 2768,
used: 0,
allocated: 2,
errors: 2,
}
expectMetrics(t, em)
// allocate 3000 ports for each allocator
// the full range and 232 more (2768 + 232 = 3000)
for i := 0; i < 3000; i++ {
a.AllocateNext()
}
em = testMetrics{
free: 0,
used: 2768,
allocated: 2768 + 2, // this is a counter, we already had 2 allocations and we did 2768 more
errors: 232 + 2, // this is a counter, we already had 2 errors and we did 232 more
}
expectMetrics(t, em)
}
func TestNodePortAllocatedMetrics(t *testing.T) {
clearMetrics()
// create NodePort allocator
portRange := "30000-32767"
pr, err := net.ParsePortRange(portRange)
if err != nil {
t.Fatal(err)
}
a, err := NewInMemory(*pr)
if err != nil {
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
}
a.EnableMetrics()
em := testMetrics{
free: 0,
used: 0,
allocated: 0,
errors: 0,
}
expectMetrics(t, em)
// allocate 2 dynamic port
found := sets.NewInt()
for i := 0; i < 2; i++ {
port, err := a.AllocateNext()
if err != nil {
t.Fatal(err)
}
if found.Has(port) {
t.Fatalf("already reserved: %d", port)
}
found.Insert(port)
}
dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
}
if dynamicAllocated != 2 {
t.Fatalf("Expected 2 received %f", dynamicAllocated)
}
// try to allocate the same ports
for s := range found {
if !a.Has(s) {
t.Fatalf("missing: %d", s)
}
if err := a.Allocate(s); err != ErrAllocated {
t.Fatal(err)
}
}
staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
}
if staticErrors != 2 {
t.Fatalf("Expected 2 received %f", staticErrors)
}
}
func TestMetricsDisabled(t *testing.T) {
clearMetrics()
// create NodePort allocator
portRange := "30000-32766"
pr, err := net.ParsePortRange(portRange)
if err != nil {
t.Fatal(err)
}
a, err := NewInMemory(*pr)
if err != nil {
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
}
a.EnableMetrics()
// create metrics disabled allocator with same port range
// this metrics should be ignored
b, err := NewInMemory(*pr)
if err != nil {
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
}
// Check initial state
em := testMetrics{
free: 0,
used: 0,
allocated: 0,
errors: 0,
}
expectMetrics(t, em)
// allocate in metrics enabled allocator
for i := 0; i < 100; i++ {
_, err := a.AllocateNext()
if err != nil {
t.Fatal(err)
}
}
em = testMetrics{
free: 2767 - 100,
used: 100,
allocated: 100,
errors: 0,
}
expectMetrics(t, em)
// allocate in metrics disabled allocator
for i := 0; i < 200; i++ {
_, err := b.AllocateNext()
if err != nil {
t.Fatal(err)
}
}
// the metrics should not be changed
expectMetrics(t, em)
}
// Metrics helpers
func clearMetrics() {
nodePortAllocated.Set(0)
nodePortAvailable.Set(0)
nodePortAllocations.Reset()
nodePortAllocationErrors.Reset()
}
type testMetrics struct {
free float64
used float64
allocated float64
errors float64
}
func expectMetrics(t *testing.T, em testMetrics) {
var m testMetrics
var err error
m.free, err = testutil.GetGaugeMetricValue(nodePortAvailable)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAvailable.Name, err)
}
m.used, err = testutil.GetGaugeMetricValue(nodePortAllocated)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocated.Name, err)
}
staticAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
}
staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
}
dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
}
dynamicErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("dynamic"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
}
m.allocated = staticAllocated + dynamicAllocated
m.errors = staticErrors + dynamicErrors
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portallocator
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "kube_apiserver"
subsystem = "nodeport_allocator"
)
var (
// nodePortAllocated indicates the amount of ports allocated by NodePort Service.
nodePortAllocated = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "allocated_ports",
Help: "Gauge measuring the number of allocated NodePorts for Services",
StabilityLevel: metrics.ALPHA,
},
)
// nodePortAvailable indicates the amount of ports available by NodePort Service.
nodePortAvailable = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "available_ports",
Help: "Gauge measuring the number of available NodePorts for Services",
StabilityLevel: metrics.ALPHA,
},
)
// nodePortAllocation counts the total number of ports allocation and allocation mode: static or dynamic.
nodePortAllocations = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "allocation_total",
Help: "Number of NodePort allocations",
StabilityLevel: metrics.ALPHA,
},
[]string{"scope"},
)
// nodePortAllocationErrors counts the number of error trying to allocate a nodePort and allocation mode: static or dynamic.
nodePortAllocationErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "allocation_errors_total",
Help: "Number of errors trying to allocate NodePort",
StabilityLevel: metrics.ALPHA,
},
[]string{"scope"},
)
)
var registerMetricsOnce sync.Once
func registerMetrics() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(nodePortAllocated)
legacyregistry.MustRegister(nodePortAvailable)
legacyregistry.MustRegister(nodePortAllocations)
legacyregistry.MustRegister(nodePortAllocationErrors)
})
}
// metricsRecorderInterface is the interface to record metrics.
type metricsRecorderInterface interface {
setAllocated(allocated int)
setAvailable(available int)
incrementAllocations(scope string)
incrementAllocationErrors(scope string)
}
// metricsRecorder implements metricsRecorderInterface.
type metricsRecorder struct{}
func (m *metricsRecorder) setAllocated(allocated int) {
nodePortAllocated.Set(float64(allocated))
}
func (m *metricsRecorder) setAvailable(available int) {
nodePortAvailable.Set(float64(available))
}
func (m *metricsRecorder) incrementAllocations(scope string) {
nodePortAllocations.WithLabelValues(scope).Inc()
}
func (m *metricsRecorder) incrementAllocationErrors(scope string) {
nodePortAllocationErrors.WithLabelValues(scope).Inc()
}
// emptyMetricsRecorder is a null object implements metricsRecorderInterface.
type emptyMetricsRecorder struct{}
func (*emptyMetricsRecorder) setAllocated(allocated int) {}
func (*emptyMetricsRecorder) setAvailable(available int) {}
func (*emptyMetricsRecorder) incrementAllocations(scope string) {}
func (*emptyMetricsRecorder) incrementAllocationErrors(scope string) {}

View File

@ -216,7 +216,7 @@ func TestAllocateReserved(t *testing.T) {
for i := 0; i < dynamicOffset; i++ {
port := i + basePortRange
if err := storage.Allocate(port); err != nil {
t.Errorf("Unexpected error trying to allocate IP %d: %v", port, err)
t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err)
}
}
if _, err := storage.AllocateNext(); err == nil {
@ -224,7 +224,7 @@ func TestAllocateReserved(t *testing.T) {
}
// release one port in the allocated block and another a new one randomly
if err := storage.Release(basePortRange + 53); err != nil {
t.Fatalf("Unexpected error trying to release ip 30053: %v", err)
t.Fatalf("Unexpected error trying to release port 30053: %v", err)
}
if _, err := storage.AllocateNext(); err != nil {
t.Error(err)