Merge pull request #120843 from aojea/repair_metrics

Add metrics to the allocators repair loops
This commit is contained in:
Kubernetes Prow Robot 2023-10-01 15:58:44 -07:00 committed by GitHub
commit e8abe1af8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 366 additions and 2 deletions

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "clusterip_repair"
)
var (
// clusterIPRepairIPErrors indicates the number of errors found by the repair loop
// divided by the type of error:
// leak, repair, full, outOfRange, duplicate, invalid, unknown
clusterIPRepairIPErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ip_errors_total",
Help: "Number of errors detected on clusterips by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown, invalid",
StabilityLevel: metrics.ALPHA,
},
[]string{"type"},
)
// clusterIPRepairReconcileErrors indicates the number of times the repair loop has failed to repair
// the errors it detected.
clusterIPRepairReconcileErrors = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "reconcile_errors_total",
Help: "Number of reconciliation failures on the clusterip repair reconcile loop",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetricsOnce sync.Once
func registerMetrics() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(clusterIPRepairIPErrors)
legacyregistry.MustRegister(clusterIPRepairReconcileErrors)
})
}

View File

@ -101,6 +101,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
leaksByFamily[secondary] = make(map[string]int) leaksByFamily[secondary] = make(map[string]int)
} }
registerMetrics()
return &Repair{ return &Repair{
interval: interval, interval: interval,
serviceClient: serviceClient, serviceClient: serviceClient,
@ -131,7 +133,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. // runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error { func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := c.doRunOnce()
if err != nil {
clusterIPRepairReconcileErrors.Inc()
}
return err
})
} }
// doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. // doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
@ -222,6 +230,7 @@ func (c *Repair) doRunOnce() error {
ip := netutils.ParseIPSloppy(ip) ip := netutils.ParseIPSloppy(ip)
if ip == nil { if ip == nil {
// cluster IP is corrupt // cluster IP is corrupt
clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
continue continue
@ -230,6 +239,7 @@ func (c *Repair) doRunOnce() error {
family := getFamilyByIP(ip) family := getFamilyByIP(ip)
if _, ok := rebuiltByFamily[family]; !ok { if _, ok := rebuiltByFamily[family]; !ok {
// this service is using an IPFamily no longer configured on cluster // this service is using an IPFamily no longer configured on cluster
clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
continue continue
@ -245,24 +255,29 @@ func (c *Repair) doRunOnce() error {
actualStored.Release(ip) actualStored.Release(ip)
} else { } else {
// cluster IP doesn't seem to be allocated // cluster IP doesn't seem to be allocated
clusterIPRepairIPErrors.WithLabelValues("repair").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
} }
delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated: case ipallocator.ErrAllocated:
// cluster IP is duplicate // cluster IP is duplicate
clusterIPRepairIPErrors.WithLabelValues("duplicate").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
case err.(*ipallocator.ErrNotInRange): case err.(*ipallocator.ErrNotInRange):
// cluster IP is out of range // cluster IP is out of range
clusterIPRepairIPErrors.WithLabelValues("outOfRange").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family]) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family])) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
case ipallocator.ErrFull: case ipallocator.ErrFull:
// somehow we are out of IPs // somehow we are out of IPs
clusterIPRepairIPErrors.WithLabelValues("full").Inc()
cidr := actualAlloc.CIDR() cidr := actualAlloc.CIDR()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
default: default:
clusterIPRepairIPErrors.WithLabelValues("unknown").Inc()
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip) c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err) return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
} }
@ -314,9 +329,11 @@ func (c *Repair) checkLeaked(leaks map[string]int, stored ipallocator.Interface,
// pretend it is still in use until count expires // pretend it is still in use until count expires
leaks[ip.String()] = count - 1 leaks[ip.String()] = count - 1
if err := rebuilt.Allocate(ip); err != nil { if err := rebuilt.Allocate(ip); err != nil {
// do not increment the metric here, if it is a leak it will be detected once the counter gets to 0
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err)) runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
} }
default: default:
clusterIPRepairIPErrors.WithLabelValues("leak").Inc()
// do not add it to the rebuilt set, which means it will be available for reuse // do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip)) runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
} }

View File

@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
@ -77,6 +78,8 @@ func TestRepair(t *testing.T) {
} }
func TestRepairLeak(t *testing.T) { func TestRepairLeak(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr) previous, err := ipallocator.NewInMemory(cidr)
if err != nil { if err != nil {
@ -126,9 +129,21 @@ func TestRepairLeak(t *testing.T) {
if after.Has(netutils.ParseIPSloppy("192.168.1.10")) { if after.Has(netutils.ParseIPSloppy("192.168.1.10")) {
t.Errorf("expected ipallocator to not have leaked IP") t.Errorf("expected ipallocator to not have leaked IP")
} }
em := testMetrics{
leak: 1,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
} }
func TestRepairWithExisting(t *testing.T) { func TestRepairWithExisting(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr) previous, err := ipallocator.NewInMemory(cidr)
if err != nil { if err != nil {
@ -213,6 +228,16 @@ func TestRepairWithExisting(t *testing.T) {
if free := after.Free(); free != 252 { if free := after.Free(); free != 252 {
t.Errorf("unexpected ipallocator state: %d free (expected 252)", free) t.Errorf("unexpected ipallocator state: %d free (expected 252)", free)
} }
em := testMetrics{
leak: 0,
repair: 2,
outOfRange: 1,
duplicate: 1,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
} }
func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry { func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry {
@ -323,6 +348,8 @@ func TestShouldWorkOnSecondary(t *testing.T) {
} }
func TestRepairDualStack(t *testing.T) { func TestRepairDualStack(t *testing.T) {
clearMetrics()
fakeClient := fake.NewSimpleClientset() fakeClient := fake.NewSimpleClientset()
ipregistry := &mockRangeRegistry{ ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"}, item: &api.RangeAllocation{Range: "192.168.1.0/24"},
@ -345,6 +372,14 @@ func TestRepairDualStack(t *testing.T) {
t.Errorf("unexpected ipregistry: %#v", ipregistry) t.Errorf("unexpected ipregistry: %#v", ipregistry)
} }
repairErrors, err := testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err)
}
if repairErrors != 0 {
t.Fatalf("0 error expected, got %v", repairErrors)
}
ipregistry = &mockRangeRegistry{ ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "192.168.1.0/24"}, item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"), updateErr: fmt.Errorf("test error"),
@ -358,9 +393,17 @@ func TestRepairDualStack(t *testing.T) {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err) t.Fatal(err)
} }
repairErrors, err = testutil.GetCounterMetricValue(clusterIPRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairReconcileErrors.Name, err)
}
if repairErrors != 1 {
t.Fatalf("1 error expected, got %v", repairErrors)
}
} }
func TestRepairLeakDualStack(t *testing.T) { func TestRepairLeakDualStack(t *testing.T) {
clearMetrics()
_, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24") _, cidr, _ := netutils.ParseCIDRSloppy("192.168.1.0/24")
previous, err := ipallocator.NewInMemory(cidr) previous, err := ipallocator.NewInMemory(cidr)
if err != nil { if err != nil {
@ -449,9 +492,22 @@ func TestRepairLeakDualStack(t *testing.T) {
if secondaryAfter.Has(netutils.ParseIPSloppy("2000::1")) { if secondaryAfter.Has(netutils.ParseIPSloppy("2000::1")) {
t.Errorf("expected ipallocator to not have leaked IP") t.Errorf("expected ipallocator to not have leaked IP")
} }
em := testMetrics{
leak: 2,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
} }
func TestRepairWithExistingDualStack(t *testing.T) { func TestRepairWithExistingDualStack(t *testing.T) {
clearMetrics()
// because anything (other than allocator) depends // because anything (other than allocator) depends
// on families assigned to service (not the value of IPFamilyPolicy) // on families assigned to service (not the value of IPFamilyPolicy)
// we can saftly create tests that has ipFamilyPolicy:nil // we can saftly create tests that has ipFamilyPolicy:nil
@ -621,4 +677,67 @@ func TestRepairWithExistingDualStack(t *testing.T) {
if free := secondaryAfter.Free(); free != 65533 { if free := secondaryAfter.Free(); free != 65533 {
t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free) t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free)
} }
em := testMetrics{
leak: 0,
repair: 5,
outOfRange: 6,
duplicate: 3,
unknown: 0,
invalid: 0,
full: 0,
}
expectMetrics(t, em)
}
// Metrics helpers
func clearMetrics() {
clusterIPRepairIPErrors.Reset()
clusterIPRepairReconcileErrors.Reset()
}
type testMetrics struct {
leak float64
repair float64
outOfRange float64
full float64
duplicate float64
invalid float64
unknown float64
}
func expectMetrics(t *testing.T, em testMetrics) {
var m testMetrics
var err error
m.leak, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("leak"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.repair, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("repair"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.outOfRange, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("outOfRange"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.duplicate, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("duplicate"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.invalid, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("invalid"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.full, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("full"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
m.unknown, err = testutil.GetCounterMetricValue(clusterIPRepairIPErrors.WithLabelValues("unknown"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", clusterIPRepairIPErrors.Name, err)
}
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
} }

View File

@ -0,0 +1,65 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "nodeport_repair"
)
var (
// nodePortRepairPortErrors indicates the number of errors found by the repair loop
// divided by the type of error:
// leak, repair, full, outOfRange, duplicate, unknown
nodePortRepairPortErrors = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "port_errors_total",
Help: "Number of errors detected on ports by the repair loop broken down by type of error: leak, repair, full, outOfRange, duplicate, unknown",
StabilityLevel: metrics.ALPHA,
},
[]string{"type"},
)
// nodePortRepairReconcileErrors indicates the number of times the repair loop has failed to repair
// the errors it detected.
nodePortRepairReconcileErrors = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "reconcile_errors_total",
Help: "Number of reconciliation failures on the nodeport repair reconcile loop",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetricsOnce sync.Once
func registerMetrics() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(nodePortRepairPortErrors)
legacyregistry.MustRegister(nodePortRepairReconcileErrors)
})
}

View File

@ -61,6 +61,8 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient}) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller") recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller")
registerMetrics()
return &Repair{ return &Repair{
interval: interval, interval: interval,
serviceClient: serviceClient, serviceClient: serviceClient,
@ -89,7 +91,13 @@ func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
// runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. // runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error { func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := c.doRunOnce()
if err != nil {
nodePortRepairReconcileErrors.Inc()
}
return err
})
} }
// doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. // doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
@ -153,23 +161,28 @@ func (c *Repair) doRunOnce() error {
stored.Release(port) stored.Release(port)
} else { } else {
// doesn't seem to be allocated // doesn't seem to be allocated
nodePortRepairPortErrors.WithLabelValues("repair").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port) c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
} }
delete(c.leaks, port) // it is used, so it can't be leaked delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated: case portallocator.ErrAllocated:
// port is duplicate, reallocate // port is duplicate, reallocate
nodePortRepairPortErrors.WithLabelValues("duplicate").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port) c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange): case err.(*portallocator.ErrNotInRange):
// port is out of range, reallocate // port is out of range, reallocate
nodePortRepairPortErrors.WithLabelValues("outOfRange").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange) c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange)) runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull: case portallocator.ErrFull:
// somehow we are out of ports // somehow we are out of ports
nodePortRepairPortErrors.WithLabelValues("full").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange) c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange) return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange)
default: default:
nodePortRepairPortErrors.WithLabelValues("unknown").Inc()
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port) c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port)
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
} }
@ -189,9 +202,11 @@ func (c *Repair) doRunOnce() error {
// pretend it is still in use until count expires // pretend it is still in use until count expires
c.leaks[port] = count - 1 c.leaks[port] = count - 1
if err := rebuilt.Allocate(port); err != nil { if err := rebuilt.Allocate(port); err != nil {
// do not increment the metric here, if it is a leak it will be detected once the counter gets to 0
runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err)) runtime.HandleError(fmt.Errorf("the node port %d may have leaked, but can not be allocated: %v", port, err))
} }
default: default:
nodePortRepairPortErrors.WithLabelValues("leak").Inc()
// do not add it to the rebuilt set, which means it will be available for reuse // do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port)) runtime.HandleError(fmt.Errorf("the node port %d appears to have leaked: cleaning up", port))
} }

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
) )
@ -53,6 +54,7 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
} }
func TestRepair(t *testing.T) { func TestRepair(t *testing.T) {
clearMetrics()
fakeClient := fake.NewSimpleClientset() fakeClient := fake.NewSimpleClientset()
registry := &mockRangeRegistry{ registry := &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"}, item: &api.RangeAllocation{Range: "100-200"},
@ -66,6 +68,13 @@ func TestRepair(t *testing.T) {
if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item { if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item {
t.Errorf("unexpected registry: %#v", registry) t.Errorf("unexpected registry: %#v", registry)
} }
repairErrors, err := testutil.GetCounterMetricValue(nodePortRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err)
}
if repairErrors != 0 {
t.Fatalf("0 error expected, got %v", repairErrors)
}
registry = &mockRangeRegistry{ registry = &mockRangeRegistry{
item: &api.RangeAllocation{Range: "100-200"}, item: &api.RangeAllocation{Range: "100-200"},
@ -75,9 +84,18 @@ func TestRepair(t *testing.T) {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err) t.Fatal(err)
} }
repairErrors, err = testutil.GetCounterMetricValue(nodePortRepairReconcileErrors)
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairReconcileErrors.Name, err)
}
if repairErrors != 1 {
t.Fatalf("1 error expected, got %v", repairErrors)
}
} }
func TestRepairLeak(t *testing.T) { func TestRepairLeak(t *testing.T) {
clearMetrics()
pr, _ := net.ParsePortRange("100-200") pr, _ := net.ParsePortRange("100-200")
previous, err := portallocator.NewInMemory(*pr) previous, err := portallocator.NewInMemory(*pr)
if err != nil { if err != nil {
@ -127,9 +145,18 @@ func TestRepairLeak(t *testing.T) {
if after.Has(111) { if after.Has(111) {
t.Errorf("expected portallocator to not have leaked port") t.Errorf("expected portallocator to not have leaked port")
} }
em := testMetrics{
leak: 1,
repair: 0,
outOfRange: 0,
duplicate: 0,
unknown: 0,
}
expectMetrics(t, em)
} }
func TestRepairWithExisting(t *testing.T) { func TestRepairWithExisting(t *testing.T) {
clearMetrics()
pr, _ := net.ParsePortRange("100-200") pr, _ := net.ParsePortRange("100-200")
previous, err := portallocator.NewInMemory(*pr) previous, err := portallocator.NewInMemory(*pr)
if err != nil { if err != nil {
@ -204,6 +231,14 @@ func TestRepairWithExisting(t *testing.T) {
if free := after.Free(); free != 97 { if free := after.Free(); free != 97 {
t.Errorf("unexpected portallocator state: %d free", free) t.Errorf("unexpected portallocator state: %d free", free)
} }
em := testMetrics{
leak: 0,
repair: 4,
outOfRange: 1,
duplicate: 1,
unknown: 0,
}
expectMetrics(t, em)
} }
func TestCollectServiceNodePorts(t *testing.T) { func TestCollectServiceNodePorts(t *testing.T) {
@ -303,3 +338,51 @@ func TestCollectServiceNodePorts(t *testing.T) {
}) })
} }
} }
// Metrics helpers
func clearMetrics() {
nodePortRepairPortErrors.Reset()
nodePortRepairReconcileErrors.Reset()
}
type testMetrics struct {
leak float64
repair float64
outOfRange float64
duplicate float64
unknown float64
full float64
}
func expectMetrics(t *testing.T, em testMetrics) {
var m testMetrics
var err error
m.leak, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("leak"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.repair, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("repair"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.outOfRange, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("outOfRange"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.duplicate, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("duplicate"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.unknown, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("unknown"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
m.full, err = testutil.GetCounterMetricValue(nodePortRepairPortErrors.WithLabelValues("full"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", nodePortRepairPortErrors.Name, err)
}
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}