add metrics to the nodeport allocator repair loop

The repair loop are great for saving us of leaks, but the side effect
is that bugs can go unnoticed for a long time, so we need some
signal to be able to identify those errors proactivily.

Add two new metrics to identify:
- errors on the reconcile loop
- errors per nodeport
This commit is contained in:
Antonio Ojea 2023-09-23 10:27:35 +00:00
parent 46dea3015f
commit 4eff70dcf9
3 changed files with 164 additions and 1 deletions

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "nodeport_repair"
)
var (
// nodePortRepairPortErrors indicates the number of errors found by the repair loop
// divided by the type of error:
// leak, repair, full, outOfRange, duplicate, unknown
nodePortRepairPortErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "port_errors_total",
Help: "Number of errors detected on ports by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown",
StabilityLevel: metrics.ALPHA,
},
[]string{"type"},
)
// nodePortRepairReconcileErrors indicates the number of times the repair loop has failed to repair
// the errors it detected.
nodePortRepairReconcileErrors = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "reconcile_errors_total",
Help: "Number of reconciliation failures on the nodeport repair reconcile loop",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetricsOnce sync.Once
func registerMetrics() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(nodePortRepairPortErrors)
legacyregistry.MustRegister(nodePortRepairReconcileErrors)
})
}

View File

@ -61,6 +61,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller")
registerMetrics()
return &Repair{
interval: interval,
serviceClient: serviceClient,
@ -89,7 +91,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
// runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce)
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := c.doRunOnce()
if err != nil {
nodePortRepairReconcileErrors.Inc()
}
return err
})
}
// doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
@ -153,23 +161,28 @@ func (c *Repair) doRunOnce() error {
stored.Release(port)
} else {
// doesn't seem to be allocated
nodePortRepairPortErrors.WithLabelValues("repair").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
}
delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated:
// port is duplicate, reallocate
nodePortRepairPortErrors.WithLabelValues("duplicate").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange):
// port is out of range, reallocate
nodePortRepairPortErrors.WithLabelValues("outOfRange").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// somehow we are out of ports
nodePortRepairPortErrors.WithLabelValues("full").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange)
default:
nodePortRepairPortErrors.WithLabelValues("unknown").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port)
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
}
@ -189,9 +202,11 @@ func (c *Repair) doRunOnce() error {
// pretend it is still in use until count expires
c.leaks[port] = count - 1
if err := rebuilt.Allocate(port); err != nil {
// do not increment the metric here, if it is a leak it will be detected once the counter gets to 0
runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
}
default:
nodePortRepairPortErrors.WithLabelValues("leak").Inc()
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port))
}

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
@ -53,6 +54,7 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
}
func TestRepair(t *testing.T) {
clearMetrics()
fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
@ -66,6 +68,13 @@ func TestRepair(t *testing.T) {
if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item {
t.Errorf("unexpected registry: %#v", registry)
}
repairErrors, err := testutil.GetCounterMetricValue(nodePortRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err)
}
if repairErrors != 0 {
t.Fatalf("0 error expected, got %v", repairErrors)
}
registry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"},
@ -75,9 +84,18 @@ func TestRepair(t *testing.T) {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
repairErrors, err = testutil.GetCounterMetricValue(nodePortRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err)
}
if repairErrors != 1 {
t.Fatalf("1 error expected, got %v", repairErrors)
}
}
func TestRepairLeak(t *testing.T) {
clearMetrics()
pr, _ := net.ParsePortRange("100-200")
previous, err := portallocator.NewInMemory(*pr)
if err != nil {
@ -127,9 +145,18 @@ func TestRepairLeak(t *testing.T) {
if after.Has(111) {
t.Errorf("expected portallocator to not have leaked port")
}
em := testMetrics{
leak: 1,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
}
expectMetrics(t, em)
}
func TestRepairWithExisting(t *testing.T) {
clearMetrics()
pr, _ := net.ParsePortRange("100-200")
previous, err := portallocator.NewInMemory(*pr)
if err != nil {
@ -204,6 +231,14 @@ func TestRepairWithExisting(t *testing.T) {
if free := after.Free(); free != 97 {
t.Errorf("unexpected portallocator state: %d free", free)
}
em := testMetrics{
leak: 0,
repair: 4,
outOfRange: 1,
duplicate: 1,
unknown: 0,
}
expectMetrics(t, em)
}
func TestCollectServiceNodePorts(t *testing.T) {
@ -303,3 +338,51 @@ func TestCollectServiceNodePorts(t *testing.T) {
})
}
}
// Metrics helpers
func clearMetrics() {
nodePortRepairPortErrors.Reset()
nodePortRepairReconcileErrors.Reset()
}
type testMetrics struct {
leak float64
repair float64
outOfRange float64
duplicate float64
unknown float64
full float64
}
func expectMetrics(t *testing.T, em testMetrics) {
var m testMetrics
var err error
m.leak, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("leak"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.repair, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("repair"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.outOfRange, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("outOfRange"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.duplicate, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("duplicate"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.unknown, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("unknown"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.full, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("full"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}