diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 1e528c074dc..36c73114a1e 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -250,6 +250,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err) } + serviceNodePortAllocator.EnableMetrics() restStorage.ServiceNodePortAllocator = serviceNodePortRegistry controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) diff --git a/pkg/registry/core/service/portallocator/allocator.go b/pkg/registry/core/service/portallocator/allocator.go index 1c640a2edd3..54837af026d 100644 --- a/pkg/registry/core/service/portallocator/allocator.go +++ b/pkg/registry/core/service/portallocator/allocator.go @@ -37,6 +37,7 @@ type Interface interface { ForEach(func(int)) Has(int) bool Destroy() + EnableMetrics() } var ( @@ -57,6 +58,9 @@ type PortAllocator struct { portRange net.PortRange alloc allocator.Interface + + // metrics is a metrics recorder that can be disabled + metrics metricsRecorderInterface } // PortAllocator implements Interface and Snapshottable @@ -69,6 +73,7 @@ func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory a := &PortAllocator{ portRange: pr, + metrics: &emptyMetricsRecorder{}, } var offset = 0 @@ -125,6 +130,9 @@ func (r *PortAllocator) Used() int { func (r *PortAllocator) Allocate(port int) error { ok, offset := r.contains(port) if !ok { + // update metrics + r.metrics.incrementAllocationErrors("static") + // include valid port range in error validPorts := r.portRange.String() return &ErrNotInRange{validPorts} @@ -132,11 +140,21 @@ func (r *PortAllocator) Allocate(port int) error { allocated, err := r.alloc.Allocate(offset) if err != nil { + // update metrics + r.metrics.incrementAllocationErrors("static") return err } if !allocated { + // update metrics + r.metrics.incrementAllocationErrors("static") return ErrAllocated } + + // update metrics + r.metrics.incrementAllocations("static") + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + return nil } @@ -145,11 +163,19 @@ func (r *PortAllocator) Allocate(port int) error { func (r *PortAllocator) AllocateNext() (int, error) { offset, ok, err := r.alloc.AllocateNext() if err != nil { + r.metrics.incrementAllocationErrors("dynamic") return 0, err } if !ok { + r.metrics.incrementAllocationErrors("dynamic") return 0, ErrFull } + + // update metrics + r.metrics.incrementAllocations("dynamic") + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + return r.portRange.Base + offset, nil } @@ -170,7 +196,13 @@ func (r *PortAllocator) Release(port int) error { return nil } - return r.alloc.Release(offset) + err := r.alloc.Release(offset) + if err == nil { + // update metrics + r.metrics.setAllocated(r.Used()) + r.metrics.setAvailable(r.Free()) + } + return err } // Has returns true if the provided port is already allocated and a call @@ -225,6 +257,12 @@ func (r *PortAllocator) Destroy() { r.alloc.Destroy() } +// EnableMetrics enables metrics recording. +func (r *PortAllocator) EnableMetrics() { + registerMetrics() + r.metrics = &metricsRecorder{} +} + // calculateRangeOffset estimates the offset used on the range for statically allocation based on // the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than // $min or more than $max, with a graduated step function between them~. The function returns 0 diff --git a/pkg/registry/core/service/portallocator/allocator_test.go b/pkg/registry/core/service/portallocator/allocator_test.go index b78b878640e..37663f0cdc1 100644 --- a/pkg/registry/core/service/portallocator/allocator_test.go +++ b/pkg/registry/core/service/portallocator/allocator_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/testutil" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" ) @@ -405,3 +406,269 @@ func Test_calculateRangeOffset(t *testing.T) { }) } } + +func TestNodePortMetrics(t *testing.T) { + clearMetrics() + // create node port allocator + portRange := "30000-32767" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + // Check initial state + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate 2 ports + found := sets.NewInt() + for i := 0; i < 2; i++ { + port, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + if found.Has(port) { + t.Fatalf("already reserved: %d", port) + } + found.Insert(port) + } + + em = testMetrics{ + free: 2768 - 2, + used: 2, + allocated: 2, + errors: 0, + } + expectMetrics(t, em) + + // try to allocate the same ports + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Allocate(s); err != ErrAllocated { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2768 - 2, + used: 2, + allocated: 2, + errors: 2, + } + expectMetrics(t, em) + + // release the ports allocated + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Release(s); err != nil { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2768, + used: 0, + allocated: 2, + errors: 2, + } + expectMetrics(t, em) + + // allocate 3000 ports for each allocator + // the full range and 232 more (2768 + 232 = 3000) + for i := 0; i < 3000; i++ { + a.AllocateNext() + } + em = testMetrics{ + free: 0, + used: 2768, + allocated: 2768 + 2, // this is a counter, we already had 2 allocations and we did 2768 more + errors: 232 + 2, // this is a counter, we already had 2 errors and we did 232 more + } + expectMetrics(t, em) +} + +func TestNodePortAllocatedMetrics(t *testing.T) { + clearMetrics() + + // create NodePort allocator + portRange := "30000-32767" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate 2 dynamic port + found := sets.NewInt() + for i := 0; i < 2; i++ { + port, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + if found.Has(port) { + t.Fatalf("already reserved: %d", port) + } + found.Insert(port) + } + + dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + if dynamicAllocated != 2 { + t.Fatalf("Expected 2 received %f", dynamicAllocated) + } + + // try to allocate the same ports + for s := range found { + if !a.Has(s) { + t.Fatalf("missing: %d", s) + } + if err := a.Allocate(s); err != ErrAllocated { + t.Fatal(err) + } + } + + staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + if staticErrors != 2 { + t.Fatalf("Expected 2 received %f", staticErrors) + } +} + +func TestMetricsDisabled(t *testing.T) { + clearMetrics() + + // create NodePort allocator + portRange := "30000-32766" + pr, err := net.ParsePortRange(portRange) + if err != nil { + t.Fatal(err) + } + + a, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + a.EnableMetrics() + + // create metrics disabled allocator with same port range + // this metrics should be ignored + b, err := NewInMemory(*pr) + if err != nil { + t.Fatalf("unexpected error creating nodeport allocator: %v", err) + } + + // Check initial state + em := testMetrics{ + free: 0, + used: 0, + allocated: 0, + errors: 0, + } + expectMetrics(t, em) + + // allocate in metrics enabled allocator + for i := 0; i < 100; i++ { + _, err := a.AllocateNext() + if err != nil { + t.Fatal(err) + } + } + em = testMetrics{ + free: 2767 - 100, + used: 100, + allocated: 100, + errors: 0, + } + expectMetrics(t, em) + + // allocate in metrics disabled allocator + for i := 0; i < 200; i++ { + _, err := b.AllocateNext() + if err != nil { + t.Fatal(err) + } + } + // the metrics should not be changed + expectMetrics(t, em) +} + +// Metrics helpers +func clearMetrics() { + nodePortAllocated.Set(0) + nodePortAvailable.Set(0) + nodePortAllocations.Reset() + nodePortAllocationErrors.Reset() +} + +type testMetrics struct { + free float64 + used float64 + allocated float64 + errors float64 +} + +func expectMetrics(t *testing.T, em testMetrics) { + var m testMetrics + var err error + m.free, err = testutil.GetGaugeMetricValue(nodePortAvailable) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAvailable.Name, err) + } + m.used, err = testutil.GetGaugeMetricValue(nodePortAllocated) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocated.Name, err) + } + staticAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err) + } + dynamicErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("dynamic")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err) + } + + m.allocated = staticAllocated + dynamicAllocated + m.errors = staticErrors + dynamicErrors + + if m != em { + t.Fatalf("metrics error: expected %v, received %v", em, m) + } +} diff --git a/pkg/registry/core/service/portallocator/metrics.go b/pkg/registry/core/service/portallocator/metrics.go new file mode 100644 index 00000000000..b2efa4ce1bb --- /dev/null +++ b/pkg/registry/core/service/portallocator/metrics.go @@ -0,0 +1,120 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portallocator + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "kube_apiserver" + subsystem = "nodeport_allocator" +) + +var ( + // nodePortAllocated indicates the amount of ports allocated by NodePort Service. + nodePortAllocated = metrics.NewGauge( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocated_ports", + Help: "Gauge measuring the number of allocated NodePorts for Services", + StabilityLevel: metrics.ALPHA, + }, + ) + // nodePortAvailable indicates the amount of ports available by NodePort Service. + nodePortAvailable = metrics.NewGauge( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "available_ports", + Help: "Gauge measuring the number of available NodePorts for Services", + StabilityLevel: metrics.ALPHA, + }, + ) + // nodePortAllocation counts the total number of ports allocation and allocation mode: static or dynamic. + nodePortAllocations = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocation_total", + Help: "Number of NodePort allocations", + StabilityLevel: metrics.ALPHA, + }, + []string{"scope"}, + ) + // nodePortAllocationErrors counts the number of error trying to allocate a nodePort and allocation mode: static or dynamic. + nodePortAllocationErrors = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "allocation_errors_total", + Help: "Number of errors trying to allocate NodePort", + StabilityLevel: metrics.ALPHA, + }, + []string{"scope"}, + ) +) + +var registerMetricsOnce sync.Once + +func registerMetrics() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(nodePortAllocated) + legacyregistry.MustRegister(nodePortAvailable) + legacyregistry.MustRegister(nodePortAllocations) + legacyregistry.MustRegister(nodePortAllocationErrors) + }) +} + +// metricsRecorderInterface is the interface to record metrics. +type metricsRecorderInterface interface { + setAllocated(allocated int) + setAvailable(available int) + incrementAllocations(scope string) + incrementAllocationErrors(scope string) +} + +// metricsRecorder implements metricsRecorderInterface. +type metricsRecorder struct{} + +func (m *metricsRecorder) setAllocated(allocated int) { + nodePortAllocated.Set(float64(allocated)) +} + +func (m *metricsRecorder) setAvailable(available int) { + nodePortAvailable.Set(float64(available)) +} + +func (m *metricsRecorder) incrementAllocations(scope string) { + nodePortAllocations.WithLabelValues(scope).Inc() +} + +func (m *metricsRecorder) incrementAllocationErrors(scope string) { + nodePortAllocationErrors.WithLabelValues(scope).Inc() +} + +// emptyMetricsRecorder is a null object implements metricsRecorderInterface. +type emptyMetricsRecorder struct{} + +func (*emptyMetricsRecorder) setAllocated(allocated int) {} +func (*emptyMetricsRecorder) setAvailable(available int) {} +func (*emptyMetricsRecorder) incrementAllocations(scope string) {} +func (*emptyMetricsRecorder) incrementAllocationErrors(scope string) {} diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go index b9a417f255f..6bf30bbeee2 100644 --- a/pkg/registry/core/service/portallocator/storage/storage_test.go +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -216,7 +216,7 @@ func TestAllocateReserved(t *testing.T) { for i := 0; i < dynamicOffset; i++ { port := i + basePortRange if err := storage.Allocate(port); err != nil { - t.Errorf("Unexpected error trying to allocate IP %d: %v", port, err) + t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err) } } if _, err := storage.AllocateNext(); err == nil { @@ -224,7 +224,7 @@ func TestAllocateReserved(t *testing.T) { } // release one port in the allocated block and another a new one randomly if err := storage.Release(basePortRange + 53); err != nil { - t.Fatalf("Unexpected error trying to release ip 30053: %v", err) + t.Fatalf("Unexpected error trying to release port 30053: %v", err) } if _, err := storage.AllocateNext(); err != nil { t.Error(err)