From e9472ecc341084da5d1ba999b73d216c8ed97a53 Mon Sep 17 00:00:00 2001 From: Ricardo Weir Date: Wed, 2 Feb 2022 17:54:08 -0700 Subject: [PATCH] Add metrics --- go.mod | 1 + pkg/metrics/metrics.go | 100 +++++++++++++++++++++++++++ pkg/metrics/register.go | 18 +++++ pkg/resources/common/formatter.go | 3 +- pkg/stores/metrics/metrics_client.go | 82 ++++++++++++++++++++++ pkg/stores/metrics/metrics_store.go | 66 ++++++++++++++++++ pkg/stores/proxy/proxy_store.go | 30 ++++---- 7 files changed, 286 insertions(+), 14 deletions(-) create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/register.go create mode 100644 pkg/stores/metrics/metrics_client.go create mode 100644 pkg/stores/metrics/metrics_store.go diff --git a/go.mod b/go.mod index 037a6ad1..8db8af03 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.7.1 github.com/rancher/apiserver v0.0.0-20210922180056-297b6df8d714 github.com/rancher/dynamiclistener v0.2.1-0.20200714201033-9c1939da3af9 github.com/rancher/kubernetes-provider-detector v0.1.2 diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 00000000..353e8e90 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,100 @@ +package metrics + +import ( + "net/http" + "strconv" + + "github.com/prometheus/client_golang/prometheus" + "github.com/rancher/apiserver/pkg/apierror" +) + +type MetricLogger struct { + Resource string + Method string +} + +var prometheusMetrics = false + +const ( + resourceLabel = "resource" + methodLabel = "method" + codeLabel = "code" +) + +var ( + // https://prometheus.io/docs/practices/instrumentation/#use-labels explains logic of having 1 total_requests + // counter with code label vs a counter for each code + + ProxyTotalResponses = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: "k8s_proxy", + Name: "total_requests", + Help: "Total count API requests", + }, + []string{resourceLabel, methodLabel, codeLabel}, + ) + K8sClientResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: "k8s_proxy", + Name: "client_request_time", + Help: "Request times in ms for k8s client from proxy store", + }, + []string{resourceLabel, methodLabel, codeLabel}) + ProxyStoreResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: "k8s_proxy", + Name: "store_request_time", + Help: "Request times in ms for k8s proxy store", + }, + []string{resourceLabel, methodLabel, codeLabel}) +) + +func (m MetricLogger) IncTotalResponses(err error) { + if prometheusMetrics { + ProxyTotalResponses.With( + prometheus.Labels{ + resourceLabel: m.Resource, + methodLabel: m.Method, + codeLabel: m.getAPIErrorCode(err), + }, + ).Inc() + } +} + +func (m MetricLogger) RecordK8sClientResponseTime(err error, val float64) { + if prometheusMetrics { + K8sClientResponseTime.With( + prometheus.Labels{ + resourceLabel: m.Resource, + methodLabel: m.Method, + codeLabel: m.getAPIErrorCode(err), + }, + ).Observe(val) + } +} + +func (m MetricLogger) RecordProxyStoreResponseTime(err error, val float64) { + if prometheusMetrics { + ProxyStoreResponseTime.With( + prometheus.Labels{ + resourceLabel: m.Resource, + methodLabel: m.Method, + codeLabel: m.getAPIErrorCode(err), + }, + ).Observe(val) + } +} + +func (m MetricLogger) getAPIErrorCode(err error) string { + successCode := "200" + if m.Method == http.MethodPost { + successCode = "201" + } + if err == nil { + return successCode + } + if apiError, ok := err.(*apierror.APIError); ok { + return strconv.Itoa(apiError.Code.Status) + } + return "500" +} diff --git a/pkg/metrics/register.go b/pkg/metrics/register.go new file mode 100644 index 00000000..e9ef95ea --- /dev/null +++ b/pkg/metrics/register.go @@ -0,0 +1,18 @@ +package metrics + +import ( + "os" + + "github.com/prometheus/client_golang/prometheus" +) + +const metricsEnv = "CATTLE_PROMETHEUS_METRICS" + +func init() { + if os.Getenv(metricsEnv) == "true" { + prometheusMetrics = true + prometheus.MustRegister(ProxyTotalResponses) + prometheus.MustRegister(K8sClientResponseTime) + prometheus.MustRegister(ProxyStoreResponseTime) + } +} diff --git a/pkg/resources/common/formatter.go b/pkg/resources/common/formatter.go index 7ea90b22..cc286161 100644 --- a/pkg/resources/common/formatter.go +++ b/pkg/resources/common/formatter.go @@ -7,6 +7,7 @@ import ( "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/schema" + metricsStore "github.com/rancher/steve/pkg/stores/metrics" "github.com/rancher/steve/pkg/stores/proxy" "github.com/rancher/steve/pkg/summarycache" "github.com/rancher/wrangler/pkg/data" @@ -22,7 +23,7 @@ func DefaultTemplate(clientGetter proxy.ClientGetter, summaryCache *summarycache.SummaryCache, asl accesscontrol.AccessSetLookup) schema.Template { return schema.Template{ - Store: proxy.NewProxyStore(clientGetter, summaryCache, asl), + Store: metricsStore.NewMetricsStore(proxy.NewProxyStore(clientGetter, summaryCache, asl)), Formatter: formatter(summaryCache), } } diff --git a/pkg/stores/metrics/metrics_client.go b/pkg/stores/metrics/metrics_client.go new file mode 100644 index 00000000..6fe79d02 --- /dev/null +++ b/pkg/stores/metrics/metrics_client.go @@ -0,0 +1,82 @@ +package metrics + +import ( + "time" + + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/metrics" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" +) + +type ResourceClientWithMetrics struct { + dynamic.ResourceInterface +} + +func Wrap(resourceInterface dynamic.ResourceInterface, err error) (ResourceClientWithMetrics, error) { + client := ResourceClientWithMetrics{} + if err != nil { + return client, err + } + client.ResourceInterface = resourceInterface + return client, err +} + +func (r ResourceClientWithMetrics) Create(apiOp *types.APIRequest, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + obj, err := r.ResourceInterface.Create(apiOp.Context(), obj, options, subresources...) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return obj, err +} + +func (r ResourceClientWithMetrics) Update(apiOp *types.APIRequest, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + obj, err := r.ResourceInterface.Update(apiOp.Context(), obj, options, subresources...) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return obj, err +} + +func (r ResourceClientWithMetrics) Delete(apiOp *types.APIRequest, name string, options metav1.DeleteOptions, subresources ...string) error { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + err := r.ResourceInterface.Delete(apiOp.Context(), name, options, subresources...) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return err +} + +func (r ResourceClientWithMetrics) Get(apiOp *types.APIRequest, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + obj, err := r.ResourceInterface.Get(apiOp.Context(), name, options, subresources...) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return obj, err +} + +func (r ResourceClientWithMetrics) List(apiOp *types.APIRequest, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + obj, err := r.ResourceInterface.List(apiOp.Context(), opts) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return obj, err +} + +func (r ResourceClientWithMetrics) Watch(apiOp *types.APIRequest, opts metav1.ListOptions) (watch.Interface, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + watchInterface, err := r.ResourceInterface.Watch(apiOp.Context(), opts) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return watchInterface, err +} + +func (r ResourceClientWithMetrics) Patch(apiOp *types.APIRequest, name string, pt k8stypes.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + start := time.Now() + obj, err := r.ResourceInterface.Patch(apiOp.Context(), name, pt, data, options, subresources...) + m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds())) + return obj, err +} diff --git a/pkg/stores/metrics/metrics_store.go b/pkg/stores/metrics/metrics_store.go new file mode 100644 index 00000000..1e316a1d --- /dev/null +++ b/pkg/stores/metrics/metrics_store.go @@ -0,0 +1,66 @@ +package metrics + +import ( + "time" + + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/metrics" +) + +type Store struct { + Store types.Store +} + +func NewMetricsStore(store types.Store) *Store { + return &Store{ + Store: store, + } +} + +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiObject, err := s.Store.ByID(apiOp, schema, id) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiObject, err +} + +func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiObjectList, err := s.Store.List(apiOp, schema) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiObjectList, err +} + +func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiObject, err := s.Store.Create(apiOp, schema, data) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiObject, err +} + +func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiObject, err := s.Store.Update(apiOp, schema, data, id) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiObject, err +} + +func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiObject, err := s.Store.Delete(apiOp, schema, id) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiObject, err +} + +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { + m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method} + storeStart := time.Now() + apiEvent, err := s.Store.Watch(apiOp, schema, w) + m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds())) + return apiEvent, err +} \ No newline at end of file diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index 368f6878..b7079231 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -14,6 +14,7 @@ import ( "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/attributes" + metricsStore "github.com/rancher/steve/pkg/stores/metrics" "github.com/rancher/steve/pkg/stores/partition" "github.com/rancher/wrangler/pkg/data" "github.com/rancher/wrangler/pkg/schemas/validation" @@ -118,7 +119,7 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { } func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) { - k8sClient, err := s.clientGetter.TableClient(apiOp, schema, namespace) + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace)) if err != nil { return nil, err } @@ -128,7 +129,7 @@ func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace return nil, err } - obj, err := k8sClient.Get(apiOp.Context(), id, opts) + obj, err := k8sClient.Get(apiOp, id, opts) rowToObject(obj) return obj, err } @@ -256,7 +257,8 @@ func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dy return types.APIObjectList{}, nil } - resultList, err := client.List(apiOp.Context(), opts) + k8sClient, _ := metricsStore.Wrap(client, nil) + resultList, err := k8sClient.List(apiOp, opts) if err != nil { return types.APIObjectList{}, err } @@ -282,14 +284,15 @@ func returnErr(err error, c chan types.APIEvent) { } } -func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) { +func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) { rev := w.Revision if rev == "-1" || rev == "0" { rev = "" } timeout := int64(60 * 30) - watcher, err := k8sClient.Watch(apiOp.Context(), metav1.ListOptions{ + k8sClient, _ := metricsStore.Wrap(client, nil) + watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{ Watch: true, TimeoutSeconds: &timeout, ResourceVersion: rev, @@ -427,7 +430,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params gvk := attributes.GVK(schema) input["apiVersion"], input["kind"] = gvk.ToAPIVersionAndKind() - k8sClient, err := s.clientGetter.TableClient(apiOp, schema, ns) + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) if err != nil { return types.APIObject{}, err } @@ -437,9 +440,10 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params return types.APIObject{}, err } - resp, err = k8sClient.Create(apiOp.Context(), &unstructured.Unstructured{Object: input}, opts) + resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts) rowToObject(resp) - return toAPI(schema, resp), err + apiObject := toAPI(schema, resp) + return apiObject, err } func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { @@ -449,7 +453,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params ) ns := types.Namespace(input) - k8sClient, err := s.clientGetter.TableClient(apiOp, schema, ns) + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) if err != nil { return types.APIObject{}, err } @@ -482,7 +486,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params } } - resp, err := k8sClient.Patch(apiOp.Context(), id, pType, bytes, opts) + resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts) if err != nil { return types.APIObject{}, err } @@ -500,7 +504,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params return types.APIObject{}, err } - resp, err := k8sClient.Update(apiOp.Context(), &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{}) + resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{}) if err != nil { return types.APIObject{}, err } @@ -515,12 +519,12 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri return types.APIObject{}, nil } - k8sClient, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) + k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)) if err != nil { return types.APIObject{}, err } - if err := k8sClient.Delete(apiOp.Context(), id, opts); err != nil { + if err := k8sClient.Delete(apiOp, id, opts); err != nil { return types.APIObject{}, err }