From b525f9e0ed0003471438fb42fa37ff4ebe36d653 Mon Sep 17 00:00:00 2001 From: Damien Grisonnet Date: Tue, 10 Nov 2020 16:28:06 +0100 Subject: [PATCH] kube-aggregator: fix apiservice availability gauge When an apiservice is deleted, its relative aggregator_unavailable_apiservice metric remains with the value of the last availability observed. Hence, if an apiservice is deleted while being unavailable, the metric remains marked as unavailable. This presents some problems when alerting on unavailable apiservices as deleted apiservices might trigger the alert indefinitely. To solve this issue, the aggregator_unavailable_apiservice metric should only reflect the availability of existing apiservices. This is achievable by using a custom Collector instead of a GaugeVec and create throw-away metrics based on an apiservice lister output. With this approach, on deletion, the apiservice will not be listed anymore, resulting in its availability metric not being exposed. Signed-off-by: Damien Grisonnet --- .../pkg/controllers/status/BUILD | 6 +- .../status/available_controller.go | 54 ++++--- .../status/available_controller_test.go | 11 +- .../pkg/controllers/status/metrics.go | 132 +++++++++++++++--- .../pkg/controllers/status/metrics_test.go | 57 ++++++++ 5 files changed, 221 insertions(+), 39 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD index 443aee80b15..6a3197382f0 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD @@ -41,7 +41,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["available_controller_test.go"], + srcs = [ + "available_controller_test.go", + "metrics_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -50,6 +53,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 525d8c98c63..5d91c85fa66 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -43,6 +43,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -52,6 +53,9 @@ import ( "k8s.io/kube-aggregator/pkg/controllers" ) +// making sure we only register metrics once into legacy registry +var registerIntoLegacyRegistryOnce sync.Once + type certKeyFunc func() ([]byte, []byte) // ServiceResolver knows how to convert a service reference into an actual location. @@ -93,6 +97,9 @@ type AvailableConditionController struct { // NOTE: the cache works because we assume that the transports constructed // by the controller only vary on the dynamic cert/key. tlsCache *tlsTransportCache + + // metrics registered into legacy registry + metrics *availabilityMetrics } type tlsTransportCache struct { @@ -163,6 +170,7 @@ func NewAvailableConditionController( "AvailableConditionController"), proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } if egressSelector != nil { @@ -203,12 +211,22 @@ func NewAvailableConditionController( c.syncFn = c.sync + // TODO: decouple from legacyregistry + var err error + registerIntoLegacyRegistryOnce.Do(func() { + err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) + }) + if err != nil { + return nil, err + } + return c, nil } func (c *AvailableConditionController) sync(key string) error { originalAPIService, err := c.apiServiceLister.Get(key) if apierrors.IsNotFound(err) { + c.metrics.ForgetAPIService(key) return nil } if err != nil { @@ -259,7 +277,7 @@ func (c *AvailableConditionController) sync(key string) error { // local API services are always considered available if apiService.Spec.Service == nil { apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition()) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -269,14 +287,14 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "ServiceNotFound" availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } else if err != nil { availableCondition.Status = apiregistrationv1.ConditionUnknown availableCondition.Reason = "ServiceAccessError" availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -297,7 +315,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "ServicePortError" availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -307,14 +325,14 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "EndpointsNotFound" availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } else if err != nil { availableCondition.Status = apiregistrationv1.ConditionUnknown availableCondition.Reason = "EndpointsAccessError" availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } hasActiveEndpoints := false @@ -335,7 +353,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "MissingEndpoints" availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } } @@ -413,7 +431,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "FailedDiscoveryCheck" availableCondition.Message = lastError.Error() apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, updateErr := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService) if updateErr != nil { return updateErr } @@ -426,26 +444,26 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "Passed" availableCondition.Message = "all checks passed" apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err = updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err = c.updateAPIServiceStatus(originalAPIService, apiService) return err } // updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead // apiservices. Doing that means we don't want to quickly issue no-op updates. -func updateAPIServiceStatus(client apiregistrationclient.APIServicesGetter, originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { +func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { // update this metric on every sync operation to reflect the actual state - setUnavailableGauge(newAPIService) + c.setUnavailableGauge(newAPIService) if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { return newAPIService, nil } - newAPIService, err := client.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) + newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) if err != nil { return nil, err } - setUnavailableCounter(originalAPIService, newAPIService) + c.setUnavailableCounter(originalAPIService, newAPIService) return newAPIService, nil } @@ -630,17 +648,17 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) { } // setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service -func setUnavailableGauge(newAPIService *apiregistrationv1.APIService) { +func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) { if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) { - unavailableGauge.WithLabelValues(newAPIService.Name).Set(0.0) + c.metrics.SetAPIServiceAvailable(newAPIService.Name) return } - unavailableGauge.WithLabelValues(newAPIService.Name).Set(1.0) + c.metrics.SetAPIServiceUnavailable(newAPIService.Name) } // setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed -func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { +func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available) isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) statusChanged := isAvailable != wasAvailable @@ -650,6 +668,6 @@ func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1. if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil { reason = newCondition.Reason } - unavailableCounter.WithLabelValues(newAPIService.Name, reason).Inc() + c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc() } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index 1bb0b403f7f..46dc74877ff 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -135,6 +135,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } for _, svc := range apiServices { c.addAPIService(svc) @@ -408,6 +409,7 @@ func TestSync(t *testing.T) { serviceResolver: &fakeServiceResolver{url: testServer.URL}, proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } c.sync(tc.apiServiceName) @@ -457,13 +459,18 @@ func TestUpdateAPIServiceStatus(t *testing.T) { bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}} fakeClient := fake.NewSimpleClientset() - updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, foo) + c := AvailableConditionController{ + apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), + metrics: newAvailabilityMetrics(), + } + + c.updateAPIServiceStatus(foo, foo) if e, a := 0, len(fakeClient.Actions()); e != a { t.Error(spew.Sdump(fakeClient.Actions())) } fakeClient.ClearActions() - updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, bar) + c.updateAPIServiceStatus(foo, bar) if e, a := 1, len(fakeClient.Actions()); e != a { t.Error(spew.Sdump(fakeClient.Actions())) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go index 3a601a9ad9d..524a8edb9aa 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go @@ -17,8 +17,9 @@ limitations under the License. package apiserver import ( + "sync" + "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" ) /* @@ -30,25 +31,120 @@ import ( * the metric stability policy. */ var ( - unavailableCounter = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "aggregator_unavailable_apiservice_total", - Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.", - StabilityLevel: metrics.ALPHA, - }, - []string{"name", "reason"}, - ) - unavailableGauge = metrics.NewGaugeVec( - &metrics.GaugeOpts{ - Name: "aggregator_unavailable_apiservice", - Help: "Gauge of APIServices which are marked as unavailable broken down by APIService name.", - StabilityLevel: metrics.ALPHA, - }, + unavailableGaugeDesc = metrics.NewDesc( + "aggregator_unavailable_apiservice", + "Gauge of APIServices which are marked as unavailable broken down by APIService name.", []string{"name"}, + nil, + metrics.ALPHA, + "", ) ) -func init() { - legacyregistry.MustRegister(unavailableCounter) - legacyregistry.MustRegister(unavailableGauge) +type availabilityMetrics struct { + unavailableCounter *metrics.CounterVec + + *availabilityCollector +} + +func newAvailabilityMetrics() *availabilityMetrics { + return &availabilityMetrics{ + unavailableCounter: metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: "aggregator_unavailable_apiservice_total", + Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.", + StabilityLevel: metrics.ALPHA, + }, + []string{"name", "reason"}, + ), + availabilityCollector: newAvailabilityCollector(), + } +} + +// Register registers apiservice availability metrics. +func (m *availabilityMetrics) Register( + registrationFunc func(metrics.Registerable) error, + customRegistrationFunc func(metrics.StableCollector) error, +) error { + err := registrationFunc(m.unavailableCounter) + if err != nil { + return err + } + + err = customRegistrationFunc(m.availabilityCollector) + if err != nil { + return err + } + + return nil +} + +// UnavailableCounter returns a counter to track apiservices marked as unavailable. +func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric { + return m.unavailableCounter.WithLabelValues(apiServiceName, reason) +} + +type availabilityCollector struct { + metrics.BaseStableCollector + + mtx sync.RWMutex + availabilities map[string]bool +} + +// Check if apiServiceStatusCollector implements necessary interface. +var _ metrics.StableCollector = &availabilityCollector{} + +func newAvailabilityCollector() *availabilityCollector { + return &availabilityCollector{ + availabilities: make(map[string]bool), + } +} + +// DescribeWithStability implements the metrics.StableCollector interface. +func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) { + ch <- unavailableGaugeDesc +} + +// CollectWithStability implements the metrics.StableCollector interface. +func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) { + c.mtx.RLock() + defer c.mtx.RUnlock() + + for apiServiceName, isAvailable := range c.availabilities { + gaugeValue := 1.0 + if isAvailable { + gaugeValue = 0.0 + } + ch <- metrics.NewLazyConstMetric( + unavailableGaugeDesc, + metrics.GaugeValue, + gaugeValue, + apiServiceName, + ) + } +} + +// SetAPIServiceAvailable sets the given apiservice availability gauge to available. +func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) { + c.setAPIServiceAvailability(apiServiceKey, true) +} + +// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable. +func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) { + c.setAPIServiceAvailability(apiServiceKey, false) +} + +func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.availabilities[apiServiceKey] = availability +} + +// ForgetAPIService removes the availability gauge of the given apiservice. +func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) { + c.mtx.Lock() + defer c.mtx.Unlock() + + delete(c.availabilities, apiServiceKey) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go new file mode 100644 index 00000000000..8205bb010c2 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics/testutil" +) + +func TestAPIServiceAvailabilityCollection(t *testing.T) { + collector := newAvailabilityCollector() + + availableAPIService := "available" + unavailableAPIService := "unavailable" + + collector.SetAPIServiceAvailable(availableAPIService) + collector.SetAPIServiceUnavailable(unavailableAPIService) + + err := testutil.CustomCollectAndCompare(collector, strings.NewReader(` + # HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name. + # TYPE aggregator_unavailable_apiservice gauge + aggregator_unavailable_apiservice{name="available"} 0 + aggregator_unavailable_apiservice{name="unavailable"} 1 + `)) + if err != nil { + t.Fatal(err) + } + + collector.ClearState() + + collector.ForgetAPIService(availableAPIService) + collector.ForgetAPIService(unavailableAPIService) + + err = testutil.CustomCollectAndCompare(collector, strings.NewReader(` + # HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name. + # TYPE aggregator_unavailable_apiservice gauge + `)) + if err != nil { + t.Fatal(err) + } +}