diff --git a/pkg/registry/core/service/ipallocator/controller/metrics.go b/pkg/registry/core/service/ipallocator/controller/metrics.go new file mode 100644 index 00000000000..8bfe6b23039 --- /dev/null +++ b/pkg/registry/core/service/ipallocator/controller/metrics.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "apiserver" + subsystem = "clusterip_repair" +) + +var ( + // clusterIPRepairIPErrors indicates the number of errors found by the repair loop + // divided by the type of error: + // leak, repair, full, outOfRange, duplicate, invalid, unknown + clusterIPRepairIPErrors = metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "ip_errors_total", + Help: "Number of errors detected on clusterips by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown, invalid", + StabilityLevel: metrics.ALPHA, + }, + []string{"type"}, + ) + // clusterIPRepairReconcileErrors indicates the number of times the repair loop has failed to repair + // the errors it detected. + clusterIPRepairReconcileErrors = metrics.NewCounter( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "reconcile_errors_total", + Help: "Number of reconciliation failures on the clusterip repair reconcile loop", + StabilityLevel: metrics.ALPHA, + }, + ) +) + +var registerMetricsOnce sync.Once + +func registerMetrics() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(clusterIPRepairIPErrors) + legacyregistry.MustRegister(clusterIPRepairReconcileErrors) + }) +} diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index f568e4fdb59..ba4ce975582 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -101,6 +101,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter leaksByFamily[secondary] = make(map[string]int) } + registerMetrics() + return &Repair{ interval: interval, serviceClient: serviceClient, @@ -131,7 +133,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { // runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) runOnce() error { - return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := c.doRunOnce() + if err != nil { + clusterIPRepairReconcileErrors.Inc() + } + return err + }) } // doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. @@ -222,6 +230,7 @@ func (c *Repair) doRunOnce() error { ip := netutils.ParseIPSloppy(ip) if ip == nil { // cluster IP is corrupt + clusterIPRepairIPErrors.WithLabelValues("invalid").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace)) continue @@ -230,6 +239,7 @@ func (c *Repair) doRunOnce() error { family := getFamilyByIP(ip) if _, ok := rebuiltByFamily[family]; !ok { // this service is using an IPFamily no longer configured on cluster + clusterIPRepairIPErrors.WithLabelValues("invalid").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family) runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace)) continue @@ -245,24 +255,29 @@ func (c *Repair) doRunOnce() error { actualStored.Release(ip) } else { // cluster IP doesn't seem to be allocated + clusterIPRepairIPErrors.WithLabelValues("repair").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace)) } delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked case ipallocator.ErrAllocated: // cluster IP is duplicate + clusterIPRepairIPErrors.WithLabelValues("duplicate").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace)) case err.(*ipallocator.ErrNotInRange): // cluster IP is out of range + clusterIPRepairIPErrors.WithLabelValues("outOfRange").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family]) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family])) case ipallocator.ErrFull: // somehow we are out of IPs + clusterIPRepairIPErrors.WithLabelValues("full").Inc() cidr := actualAlloc.CIDR() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) default: + clusterIPRepairIPErrors.WithLabelValues("unknown").Inc() c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip) return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err) } @@ -314,9 +329,11 @@ func (c *Repair) checkLeaked(leaks map[string]int, stored ipallocator.Interface, // pretend it is still in use until count expires leaks[ip.String()] = count - 1 if err := rebuilt.Allocate(ip); err != nil { + // do not increment the metric here, if it is a leak it will be detected once the counter gets to 0 runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err)) } default: + clusterIPRepairIPErrors.WithLabelValues("leak").Inc() // do not add it to the rebuilt set, which means it will be available for reuse runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip)) } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index a3d2144ecc3..2fdc06db5aa 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "k8s.io/component-base/metrics/testutil" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" netutils "k8s.io/utils/net" @@ -77,6 +78,8 @@ func TestRepair(t *testing.T) { } func TestRepairLeak(t *testing.T) { + clearMetrics() + _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") previous, err := ipallocator.NewInMemory(cidr) if err != nil { @@ -126,9 +129,21 @@ func TestRepairLeak(t *testing.T) { if after.Has(netutils.ParseIPSloppy("192.168.1.10")) { t.Errorf("expected ipallocator to not have leaked IP") } + em := testMetrics{ + leak: 1, + repair: 0, + outOfRange: 0, + duplicate: 0, + unknown: 0, + invalid: 0, + full: 0, + } + expectMetrics(t, em) } func TestRepairWithExisting(t *testing.T) { + clearMetrics() + _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") previous, err := ipallocator.NewInMemory(cidr) if err != nil { @@ -213,6 +228,16 @@ func TestRepairWithExisting(t *testing.T) { if free := after.Free(); free != 252 { t.Errorf("unexpected ipallocator state: %d free (expected 252)", free) } + em := testMetrics{ + leak: 0, + repair: 2, + outOfRange: 1, + duplicate: 1, + unknown: 0, + invalid: 0, + full: 0, + } + expectMetrics(t, em) } func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry { @@ -323,6 +348,8 @@ func TestShouldWorkOnSecondary(t *testing.T) { } func TestRepairDualStack(t *testing.T) { + clearMetrics() + fakeClient := fake.NewSimpleClientset() ipregistry := &mockRangeRegistry{ item: &api.RangeAllocation{Range: "192.168.1.0/24"}, @@ -345,6 +372,14 @@ func TestRepairDualStack(t *testing.T) { t.Errorf("unexpected ipregistry: %#v", ipregistry) } + repairErrors, err := testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err) + } + if repairErrors != 0 { + t.Fatalf("0 error expected, got %v", repairErrors) + } + ipregistry = &mockRangeRegistry{ item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), @@ -358,9 +393,17 @@ func TestRepairDualStack(t *testing.T) { if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } + repairErrors, err = testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err) + } + if repairErrors != 1 { + t.Fatalf("1 error expected, got %v", repairErrors) + } } func TestRepairLeakDualStack(t *testing.T) { + clearMetrics() _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") previous, err := ipallocator.NewInMemory(cidr) if err != nil { @@ -449,9 +492,22 @@ func TestRepairLeakDualStack(t *testing.T) { if secondaryAfter.Has(netutils.ParseIPSloppy("2000::1")) { t.Errorf("expected ipallocator to not have leaked IP") } + + em := testMetrics{ + leak: 2, + repair: 0, + outOfRange: 0, + duplicate: 0, + unknown: 0, + invalid: 0, + full: 0, + } + expectMetrics(t, em) + } func TestRepairWithExistingDualStack(t *testing.T) { + clearMetrics() // because anything (other than allocator) depends // on families assigned to service (not the value of IPFamilyPolicy) // we can saftly create tests that has ipFamilyPolicy:nil @@ -621,4 +677,67 @@ func TestRepairWithExistingDualStack(t *testing.T) { if free := secondaryAfter.Free(); free != 65533 { t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free) } + em := testMetrics{ + leak: 0, + repair: 5, + outOfRange: 6, + duplicate: 3, + unknown: 0, + invalid: 0, + full: 0, + } + expectMetrics(t, em) +} + +// Metrics helpers +func clearMetrics() { + clusterIPRepairIPErrors.Reset() + clusterIPRepairReconcileErrors.Reset() +} + +type testMetrics struct { + leak float64 + repair float64 + outOfRange float64 + full float64 + duplicate float64 + invalid float64 + unknown float64 +} + +func expectMetrics(t *testing.T, em testMetrics) { + var m testMetrics + var err error + + m.leak, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("leak")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.repair, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("repair")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.outOfRange, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("outOfRange")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.duplicate, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("duplicate")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.invalid, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("invalid")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.full, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("full")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + m.unknown, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("unknown")) + if err != nil { + t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err) + } + if m != em { + t.Fatalf("metrics error: expected %v, received %v", em, m) + } }