add metrics to the ipallocator repair loop

The repair loop are great for saving us of leaks, but the side effect
is that bugs can go unnoticed for a long time, so we need some
signal to be able to identify those errors proactivily.

Add two new metrics to identify:
- errors on the reconcile loop
- errors per clusterip
This commit is contained in:
Antonio Ojea 2023-09-23 10:51:12 +00:00
parent 4eff70dcf9
commit 566fad5eda
3 changed files with 202 additions and 1 deletions

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "clusterip_repair"
)
var (
// clusterIPRepairIPErrors indicates the number of errors found by the repair loop
// divided by the type of error:
// leak, repair, full, outOfRange, duplicate, invalid, unknown
clusterIPRepairIPErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ip_errors_total",
Help: "Number of errors detected on clusterips by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown, invalid",
StabilityLevel: metrics.ALPHA,
},
[]string{"type"},
)
// clusterIPRepairReconcileErrors indicates the number of times the repair loop has failed to repair
// the errors it detected.
clusterIPRepairReconcileErrors = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "reconcile_errors_total",
Help: "Number of reconciliation failures on the clusterip repair reconcile loop",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetricsOnce sync.Once
func registerMetrics() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(clusterIPRepairIPErrors)
legacyregistry.MustRegister(clusterIPRepairReconcileErrors)
})
}

View File

@ -101,6 +101,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
leaksByFamily[secondary] = make(map[string]int)
}
registerMetrics()
return &Repair{
interval: interval,
serviceClient: serviceClient,
@ -131,7 +133,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce)
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := c.doRunOnce()
if err != nil {
clusterIPRepairReconcileErrors.Inc()
}
return err
})
}
// doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
@ -222,6 +230,7 @@ func (c *Repair) doRunOnce() error {
ip := netutils.ParseIPSloppy(ip)
if ip == nil {
// cluster IP is corrupt
clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
continue
@ -230,6 +239,7 @@ func (c *Repair) doRunOnce() error {
family := getFamilyByIP(ip)
if _, ok := rebuiltByFamily[family]; !ok {
// this service is using an IPFamily no longer configured on cluster
clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
continue
@ -245,24 +255,29 @@ func (c *Repair) doRunOnce() error {
actualStored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
clusterIPRepairIPErrors.WithLabelValues("repair").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
}
delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated:
// cluster IP is duplicate
clusterIPRepairIPErrors.WithLabelValues("duplicate").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
case err.(*ipallocator.ErrNotInRange):
// cluster IP is out of range
clusterIPRepairIPErrors.WithLabelValues("outOfRange").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
case ipallocator.ErrFull:
// somehow we are out of IPs
clusterIPRepairIPErrors.WithLabelValues("full").Inc()
cidr := actualAlloc.CIDR()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
default:
clusterIPRepairIPErrors.WithLabelValues("unknown").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
}
@ -314,9 +329,11 @@ func (c *Repair) checkLeaked(leaks map[string]int, stored ipallocator.Interface,
// pretend it is still in use until count expires
leaks[ip.String()] = count - 1
if err := rebuilt.Allocate(ip); err != nil {
// do not increment the metric here, if it is a leak it will be detected once the counter gets to 0
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
}
default:
clusterIPRepairIPErrors.WithLabelValues("leak").Inc()
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
}

View File

@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
netutils "k8s.io/utils/net"
@ -77,6 +78,8 @@ func TestRepair(t *testing.T) {
}
func TestRepairLeak(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr)
if err != nil {
@ -126,9 +129,21 @@ func TestRepairLeak(t *testing.T) {
if after.Has(netutils.ParseIPSloppy("192.168.1.10")) {
t.Errorf("expected ipallocator to not have leaked IP")
}
em := testMetrics{
leak: 1,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
}
func TestRepairWithExisting(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr)
if err != nil {
@ -213,6 +228,16 @@ func TestRepairWithExisting(t *testing.T) {
if free := after.Free(); free != 252 {
t.Errorf("unexpected ipallocator state: %d free (expected 252)", free)
}
em := testMetrics{
leak: 0,
repair: 2,
outOfRange: 1,
duplicate: 1,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
}
func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry {
@ -323,6 +348,8 @@ func TestShouldWorkOnSecondary(t *testing.T) {
}
func TestRepairDualStack(t *testing.T) {
clearMetrics()
fakeClient := fake.NewSimpleClientset()
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
@ -345,6 +372,14 @@ func TestRepairDualStack(t *testing.T) {
t.Errorf("unexpected ipregistry: %#v", ipregistry)
}
repairErrors, err := testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err)
}
if repairErrors != 0 {
t.Fatalf("0 error expected, got %v", repairErrors)
}
ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
@ -358,9 +393,17 @@ func TestRepairDualStack(t *testing.T) {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
repairErrors, err = testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err)
}
if repairErrors != 1 {
t.Fatalf("1 error expected, got %v", repairErrors)
}
}
func TestRepairLeakDualStack(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr)
if err != nil {
@ -449,9 +492,22 @@ func TestRepairLeakDualStack(t *testing.T) {
if secondaryAfter.Has(netutils.ParseIPSloppy("2000::1")) {
t.Errorf("expected ipallocator to not have leaked IP")
}
em := testMetrics{
leak: 2,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
}
func TestRepairWithExistingDualStack(t *testing.T) {
clearMetrics()
// because anything (other than allocator) depends
// on families assigned to service (not the value of IPFamilyPolicy)
// we can saftly create tests that has ipFamilyPolicy:nil
@ -621,4 +677,67 @@ func TestRepairWithExistingDualStack(t *testing.T) {
if free := secondaryAfter.Free(); free != 65533 {
t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free)
}
em := testMetrics{
leak: 0,
repair: 5,
outOfRange: 6,
duplicate: 3,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
}
// Metrics helpers
func clearMetrics() {
clusterIPRepairIPErrors.Reset()
clusterIPRepairReconcileErrors.Reset()
}
type testMetrics struct {
leak float64
repair float64
outOfRange float64
full float64
duplicate float64
invalid float64
unknown float64
}
func expectMetrics(t *testing.T, em testMetrics) {
var m testMetrics
var err error
m.leak, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("leak"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.repair, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("repair"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.outOfRange, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("outOfRange"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.duplicate, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("duplicate"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.invalid, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("invalid"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.full, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("full"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.unknown, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("unknown"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}