Merge pull request #96421 from dgrisonnet/fix-apiservice-availability

Fix aggregator_unavailable_apiservice gauge
This commit is contained in:
Kubernetes Prow Robot 2020-11-26 07:24:19 -08:00 committed by GitHub
commit 5ed4b76a03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 221 additions and 39 deletions

View File

@ -41,7 +41,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["available_controller_test.go"],
srcs = [
"available_controller_test.go",
"metrics_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -50,6 +53,7 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library",

View File

@ -43,6 +43,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@ -52,6 +53,9 @@ import (
"k8s.io/kube-aggregator/pkg/controllers"
)
// making sure we only register metrics once into legacy registry
var registerIntoLegacyRegistryOnce sync.Once
type certKeyFunc func() ([]byte, []byte)
// ServiceResolver knows how to convert a service reference into an actual location.
@ -93,6 +97,9 @@ type AvailableConditionController struct {
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
tlsCache *tlsTransportCache
// metrics registered into legacy registry
metrics *availabilityMetrics
}
type tlsTransportCache struct {
@ -163,6 +170,7 @@ func NewAvailableConditionController(
"AvailableConditionController"),
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
if egressSelector != nil {
@ -203,12 +211,22 @@ func NewAvailableConditionController(
c.syncFn = c.sync
// TODO: decouple from legacyregistry
var err error
registerIntoLegacyRegistryOnce.Do(func() {
err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister)
})
if err != nil {
return nil, err
}
return c, nil
}
func (c *AvailableConditionController) sync(key string) error {
originalAPIService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) {
c.metrics.ForgetAPIService(key)
return nil
}
if err != nil {
@ -259,7 +277,7 @@ func (c *AvailableConditionController) sync(key string) error {
// local API services are always considered available
if apiService.Spec.Service == nil {
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
@ -269,14 +287,14 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "ServiceNotFound"
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
} else if err != nil {
availableCondition.Status = apiregistrationv1.ConditionUnknown
availableCondition.Reason = "ServiceAccessError"
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
@ -297,7 +315,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "ServicePortError"
availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
@ -307,14 +325,14 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "EndpointsNotFound"
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
} else if err != nil {
availableCondition.Status = apiregistrationv1.ConditionUnknown
availableCondition.Reason = "EndpointsAccessError"
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
hasActiveEndpoints := false
@ -335,7 +353,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "MissingEndpoints"
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
}
@ -413,7 +431,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "FailedDiscoveryCheck"
availableCondition.Message = lastError.Error()
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, updateErr := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService)
if updateErr != nil {
return updateErr
}
@ -426,26 +444,26 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "Passed"
availableCondition.Message = "all checks passed"
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err = updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
// apiservices. Doing that means we don't want to quickly issue no-op updates.
func updateAPIServiceStatus(client apiregistrationclient.APIServicesGetter, originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
// update this metric on every sync operation to reflect the actual state
setUnavailableGauge(newAPIService)
c.setUnavailableGauge(newAPIService)
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil
}
newAPIService, err := client.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
setUnavailableCounter(originalAPIService, newAPIService)
c.setUnavailableCounter(originalAPIService, newAPIService)
return newAPIService, nil
}
@ -630,17 +648,17 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
}
// setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service
func setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
unavailableGauge.WithLabelValues(newAPIService.Name).Set(0.0)
c.metrics.SetAPIServiceAvailable(newAPIService.Name)
return
}
unavailableGauge.WithLabelValues(newAPIService.Name).Set(1.0)
c.metrics.SetAPIServiceUnavailable(newAPIService.Name)
}
// setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
statusChanged := isAvailable != wasAvailable
@ -650,6 +668,6 @@ func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
reason = newCondition.Reason
}
unavailableCounter.WithLabelValues(newAPIService.Name, reason).Inc()
c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc()
}
}

View File

@ -135,6 +135,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
for _, svc := range apiServices {
c.addAPIService(svc)
@ -408,6 +409,7 @@ func TestSync(t *testing.T) {
serviceResolver: &fakeServiceResolver{url: testServer.URL},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
c.sync(tc.apiServiceName)
@ -457,13 +459,18 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}
fakeClient := fake.NewSimpleClientset()
updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, foo)
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter),
metrics: newAvailabilityMetrics(),
}
c.updateAPIServiceStatus(foo, foo)
if e, a := 0, len(fakeClient.Actions()); e != a {
t.Error(spew.Sdump(fakeClient.Actions()))
}
fakeClient.ClearActions()
updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, bar)
c.updateAPIServiceStatus(foo, bar)
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Error(spew.Sdump(fakeClient.Actions()))
}

View File

@ -17,8 +17,9 @@ limitations under the License.
package apiserver
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
/*
@ -30,25 +31,120 @@ import (
* the metric stability policy.
*/
var (
unavailableCounter = metrics.NewCounterVec(
unavailableGaugeDesc = metrics.NewDesc(
"aggregator_unavailable_apiservice",
"Gauge of APIServices which are marked as unavailable broken down by APIService name.",
[]string{"name"},
nil,
metrics.ALPHA,
"",
)
)
type availabilityMetrics struct {
unavailableCounter *metrics.CounterVec
*availabilityCollector
}
func newAvailabilityMetrics() *availabilityMetrics {
return &availabilityMetrics{
unavailableCounter: metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "aggregator_unavailable_apiservice_total",
Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.",
StabilityLevel: metrics.ALPHA,
},
[]string{"name", "reason"},
)
unavailableGauge = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Name: "aggregator_unavailable_apiservice",
Help: "Gauge of APIServices which are marked as unavailable broken down by APIService name.",
StabilityLevel: metrics.ALPHA,
},
[]string{"name"},
)
)
func init() {
legacyregistry.MustRegister(unavailableCounter)
legacyregistry.MustRegister(unavailableGauge)
),
availabilityCollector: newAvailabilityCollector(),
}
}
// Register registers apiservice availability metrics.
func (m *availabilityMetrics) Register(
registrationFunc func(metrics.Registerable) error,
customRegistrationFunc func(metrics.StableCollector) error,
) error {
err := registrationFunc(m.unavailableCounter)
if err != nil {
return err
}
err = customRegistrationFunc(m.availabilityCollector)
if err != nil {
return err
}
return nil
}
// UnavailableCounter returns a counter to track apiservices marked as unavailable.
func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric {
return m.unavailableCounter.WithLabelValues(apiServiceName, reason)
}
type availabilityCollector struct {
metrics.BaseStableCollector
mtx sync.RWMutex
availabilities map[string]bool
}
// Check if apiServiceStatusCollector implements necessary interface.
var _ metrics.StableCollector = &availabilityCollector{}
func newAvailabilityCollector() *availabilityCollector {
return &availabilityCollector{
availabilities: make(map[string]bool),
}
}
// DescribeWithStability implements the metrics.StableCollector interface.
func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- unavailableGaugeDesc
}
// CollectWithStability implements the metrics.StableCollector interface.
func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) {
c.mtx.RLock()
defer c.mtx.RUnlock()
for apiServiceName, isAvailable := range c.availabilities {
gaugeValue := 1.0
if isAvailable {
gaugeValue = 0.0
}
ch <- metrics.NewLazyConstMetric(
unavailableGaugeDesc,
metrics.GaugeValue,
gaugeValue,
apiServiceName,
)
}
}
// SetAPIServiceAvailable sets the given apiservice availability gauge to available.
func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) {
c.setAPIServiceAvailability(apiServiceKey, true)
}
// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable.
func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) {
c.setAPIServiceAvailability(apiServiceKey, false)
}
func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.availabilities[apiServiceKey] = availability
}
// ForgetAPIService removes the availability gauge of the given apiservice.
func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) {
c.mtx.Lock()
defer c.mtx.Unlock()
delete(c.availabilities, apiServiceKey)
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"strings"
"testing"
"k8s.io/component-base/metrics/testutil"
)
func TestAPIServiceAvailabilityCollection(t *testing.T) {
collector := newAvailabilityCollector()
availableAPIService := "available"
unavailableAPIService := "unavailable"
collector.SetAPIServiceAvailable(availableAPIService)
collector.SetAPIServiceUnavailable(unavailableAPIService)
err := testutil.CustomCollectAndCompare(collector, strings.NewReader(`
# HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name.
# TYPE aggregator_unavailable_apiservice gauge
aggregator_unavailable_apiservice{name="available"} 0
aggregator_unavailable_apiservice{name="unavailable"} 1
`))
if err != nil {
t.Fatal(err)
}
collector.ClearState()
collector.ForgetAPIService(availableAPIService)
collector.ForgetAPIService(unavailableAPIService)
err = testutil.CustomCollectAndCompare(collector, strings.NewReader(`
# HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name.
# TYPE aggregator_unavailable_apiservice gauge
`))
if err != nil {
t.Fatal(err)
}
}