From 4eff70dcf9ff03e7e9d30acb461b161200f16b57 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 23 Sep 2023 10:27:35 +0000 Subject: [PATCH] add metrics to the nodeport allocator repair loop The repair loop are great for saving us of leaks, but the side effect is that bugs can go unnoticed for a long time, so we need some signal to be able to identify those errors proactivily. Add two new metrics to identify: - errors on the reconcile loop - errors per nodeport --- .../portallocator/controller/metrics.go | 65 +++++++++++++++ .../portallocator/controller/repair.go | 17 +++- .../portallocator/controller/repair_test.go | 83 +++++++++++++++++++ 3 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 pkg/registry/core/service/portallocator/controller/metrics.go diff --git a/pkg/registry/core/service/portallocator/controller/metrics.go b/pkg/registry/core/service/portallocator/controller/metrics.go new file mode 100644 index 00000000000..c6766a9003b --- /dev/null +++ b/pkg/registry/core/service/portallocator/controller/metrics.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "apiserver" + subsystem = "nodeport_repair" +) + +var ( + // nodePortRepairPortErrors indicates the number of errors found by the repair loop + // divided by the type of error: + // leak, repair, full, outOfRange, duplicate, unknown + nodePortRepairPortErrors = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "port_errors_total", + Help: "Number of errors detected on ports by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown", + StabilityLevel: metrics.ALPHA, + }, + []string{"type"}, + ) + // nodePortRepairReconcileErrors indicates the number of times the repair loop has failed to repair + // the errors it detected. + nodePortRepairReconcileErrors = metrics.NewCounter( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "reconcile_errors_total", + Help: "Number of reconciliation failures on the nodeport repair reconcile loop", + StabilityLevel: metrics.ALPHA, + }, + ) +) + +var registerMetricsOnce sync.Once + +func registerMetrics() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(nodePortRepairPortErrors) + legacyregistry.MustRegister(nodePortRepairReconcileErrors) + }) +} diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index fa870760490..e76695a4c0f 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -61,6 +61,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient}) recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller") + registerMetrics() + return &Repair{ interval: interval, serviceClient: serviceClient, @@ -89,7 +91,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { // runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) runOnce() error { - return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := c.doRunOnce() + if err != nil { + nodePortRepairReconcileErrors.Inc() + } + return err + }) } // doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. @@ -153,23 +161,28 @@ func (c *Repair) doRunOnce() error { stored.Release(port) } else { // doesn't seem to be allocated + nodePortRepairPortErrors.WithLabelValues("repair").Inc() c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) } delete(c.leaks, port) // it is used, so it can't be leaked case portallocator.ErrAllocated: // port is duplicate, reallocate + nodePortRepairPortErrors.WithLabelValues("duplicate").Inc() c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case err.(*portallocator.ErrNotInRange): // port is out of range, reallocate + nodePortRepairPortErrors.WithLabelValues("outOfRange").Inc() c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange) runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: // somehow we are out of ports + nodePortRepairPortErrors.WithLabelValues("full").Inc() c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange) return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange) default: + nodePortRepairPortErrors.WithLabelValues("unknown").Inc() c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port) return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) } @@ -189,9 +202,11 @@ func (c *Repair) doRunOnce() error { // pretend it is still in use until count expires c.leaks[port] = count - 1 if err := rebuilt.Allocate(port); err != nil { + // do not increment the metric here, if it is a leak it will be detected once the counter gets to 0 runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err)) } default: + nodePortRepairPortErrors.WithLabelValues("leak").Inc() // do not add it to the rebuilt set, which means it will be available for reuse runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port)) } diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index 9869d32fadf..c0f3681f57d 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/kubernetes/fake" + "k8s.io/component-base/metrics/testutil" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -53,6 +54,7 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { } func TestRepair(t *testing.T) { + clearMetrics() fakeClient := fake.NewSimpleClientset() registry := &mockRangeRegistry{ item: &api.RangeAllocation{Range: "100-200"}, @@ -66,6 +68,13 @@ func TestRepair(t *testing.T) { if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item { t.Errorf("unexpected registry: %#v", registry) } + repairErrors, err := testutil.GetCounterMetricValue(nodePortRepairReconcileErrors) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err) + } + if repairErrors != 0 { + t.Fatalf("0 error expected, got %v", repairErrors) + } registry = &mockRangeRegistry{ item: &api.RangeAllocation{Range: "100-200"}, @@ -75,9 +84,18 @@ func TestRepair(t *testing.T) { if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } + repairErrors, err = testutil.GetCounterMetricValue(nodePortRepairReconcileErrors) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err) + } + if repairErrors != 1 { + t.Fatalf("1 error expected, got %v", repairErrors) + } } func TestRepairLeak(t *testing.T) { + clearMetrics() + pr, _ := net.ParsePortRange("100-200") previous, err := portallocator.NewInMemory(*pr) if err != nil { @@ -127,9 +145,18 @@ func TestRepairLeak(t *testing.T) { if after.Has(111) { t.Errorf("expected portallocator to not have leaked port") } + em := testMetrics{ + leak: 1, + repair: 0, + outOfRange: 0, + duplicate: 0, + unknown: 0, + } + expectMetrics(t, em) } func TestRepairWithExisting(t *testing.T) { + clearMetrics() pr, _ := net.ParsePortRange("100-200") previous, err := portallocator.NewInMemory(*pr) if err != nil { @@ -204,6 +231,14 @@ func TestRepairWithExisting(t *testing.T) { if free := after.Free(); free != 97 { t.Errorf("unexpected portallocator state: %d free", free) } + em := testMetrics{ + leak: 0, + repair: 4, + outOfRange: 1, + duplicate: 1, + unknown: 0, + } + expectMetrics(t, em) } func TestCollectServiceNodePorts(t *testing.T) { @@ -303,3 +338,51 @@ func TestCollectServiceNodePorts(t *testing.T) { }) } } + +// Metrics helpers +func clearMetrics() { + nodePortRepairPortErrors.Reset() + nodePortRepairReconcileErrors.Reset() +} + +type testMetrics struct { + leak float64 + repair float64 + outOfRange float64 + duplicate float64 + unknown float64 + full float64 +} + +func expectMetrics(t *testing.T, em testMetrics) { + var m testMetrics + var err error + + m.leak, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("leak")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + m.repair, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("repair")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + m.outOfRange, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("outOfRange")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + m.duplicate, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("duplicate")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + m.unknown, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("unknown")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + m.full, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("full")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err) + } + if m != em { + t.Fatalf("metrics error: expected %v, received %v", em, m) + } +}