mirror of
https://github.com/rancher/steve.git
synced 2025-04-27 11:00:48 +00:00
commit
a91d90251f
1
go.mod
1
go.mod
@ -17,6 +17,7 @@ require (
|
||||
github.com/imdario/mergo v0.3.8 // indirect
|
||||
github.com/pborman/uuid v1.2.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.7.1
|
||||
github.com/rancher/apiserver v0.0.0-20210922180056-297b6df8d714
|
||||
github.com/rancher/dynamiclistener v0.2.1-0.20200714201033-9c1939da3af9
|
||||
github.com/rancher/kubernetes-provider-detector v0.1.2
|
||||
|
100
pkg/metrics/metrics.go
Normal file
100
pkg/metrics/metrics.go
Normal file
@ -0,0 +1,100 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/rancher/apiserver/pkg/apierror"
|
||||
)
|
||||
|
||||
type MetricLogger struct {
|
||||
Resource string
|
||||
Method string
|
||||
}
|
||||
|
||||
var prometheusMetrics = false
|
||||
|
||||
const (
|
||||
resourceLabel = "resource"
|
||||
methodLabel = "method"
|
||||
codeLabel = "code"
|
||||
)
|
||||
|
||||
var (
|
||||
// https://prometheus.io/docs/practices/instrumentation/#use-labels explains logic of having 1 total_requests
|
||||
// counter with code label vs a counter for each code
|
||||
|
||||
ProxyTotalResponses = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Subsystem: "k8s_proxy",
|
||||
Name: "total_requests",
|
||||
Help: "Total count API requests",
|
||||
},
|
||||
[]string{resourceLabel, methodLabel, codeLabel},
|
||||
)
|
||||
K8sClientResponseTime = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: "k8s_proxy",
|
||||
Name: "client_request_time",
|
||||
Help: "Request times in ms for k8s client from proxy store",
|
||||
},
|
||||
[]string{resourceLabel, methodLabel, codeLabel})
|
||||
ProxyStoreResponseTime = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: "k8s_proxy",
|
||||
Name: "store_request_time",
|
||||
Help: "Request times in ms for k8s proxy store",
|
||||
},
|
||||
[]string{resourceLabel, methodLabel, codeLabel})
|
||||
)
|
||||
|
||||
func (m MetricLogger) IncTotalResponses(err error) {
|
||||
if prometheusMetrics {
|
||||
ProxyTotalResponses.With(
|
||||
prometheus.Labels{
|
||||
resourceLabel: m.Resource,
|
||||
methodLabel: m.Method,
|
||||
codeLabel: m.getAPIErrorCode(err),
|
||||
},
|
||||
).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func (m MetricLogger) RecordK8sClientResponseTime(err error, val float64) {
|
||||
if prometheusMetrics {
|
||||
K8sClientResponseTime.With(
|
||||
prometheus.Labels{
|
||||
resourceLabel: m.Resource,
|
||||
methodLabel: m.Method,
|
||||
codeLabel: m.getAPIErrorCode(err),
|
||||
},
|
||||
).Observe(val)
|
||||
}
|
||||
}
|
||||
|
||||
func (m MetricLogger) RecordProxyStoreResponseTime(err error, val float64) {
|
||||
if prometheusMetrics {
|
||||
ProxyStoreResponseTime.With(
|
||||
prometheus.Labels{
|
||||
resourceLabel: m.Resource,
|
||||
methodLabel: m.Method,
|
||||
codeLabel: m.getAPIErrorCode(err),
|
||||
},
|
||||
).Observe(val)
|
||||
}
|
||||
}
|
||||
|
||||
func (m MetricLogger) getAPIErrorCode(err error) string {
|
||||
successCode := "200"
|
||||
if m.Method == http.MethodPost {
|
||||
successCode = "201"
|
||||
}
|
||||
if err == nil {
|
||||
return successCode
|
||||
}
|
||||
if apiError, ok := err.(*apierror.APIError); ok {
|
||||
return strconv.Itoa(apiError.Code.Status)
|
||||
}
|
||||
return "500"
|
||||
}
|
18
pkg/metrics/register.go
Normal file
18
pkg/metrics/register.go
Normal file
@ -0,0 +1,18 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const metricsEnv = "CATTLE_PROMETHEUS_METRICS"
|
||||
|
||||
func init() {
|
||||
if os.Getenv(metricsEnv) == "true" {
|
||||
prometheusMetrics = true
|
||||
prometheus.MustRegister(ProxyTotalResponses)
|
||||
prometheus.MustRegister(K8sClientResponseTime)
|
||||
prometheus.MustRegister(ProxyStoreResponseTime)
|
||||
}
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/steve/pkg/attributes"
|
||||
"github.com/rancher/steve/pkg/schema"
|
||||
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
||||
"github.com/rancher/steve/pkg/stores/proxy"
|
||||
"github.com/rancher/steve/pkg/summarycache"
|
||||
"github.com/rancher/wrangler/pkg/data"
|
||||
@ -22,7 +23,7 @@ func DefaultTemplate(clientGetter proxy.ClientGetter,
|
||||
summaryCache *summarycache.SummaryCache,
|
||||
asl accesscontrol.AccessSetLookup) schema.Template {
|
||||
return schema.Template{
|
||||
Store: proxy.NewProxyStore(clientGetter, summaryCache, asl),
|
||||
Store: metricsStore.NewMetricsStore(proxy.NewProxyStore(clientGetter, summaryCache, asl)),
|
||||
Formatter: formatter(summaryCache),
|
||||
}
|
||||
}
|
||||
|
82
pkg/stores/metrics/metrics_client.go
Normal file
82
pkg/stores/metrics/metrics_client.go
Normal file
@ -0,0 +1,82 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/metrics"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/dynamic"
|
||||
)
|
||||
|
||||
type ResourceClientWithMetrics struct {
|
||||
dynamic.ResourceInterface
|
||||
}
|
||||
|
||||
func Wrap(resourceInterface dynamic.ResourceInterface, err error) (ResourceClientWithMetrics, error) {
|
||||
client := ResourceClientWithMetrics{}
|
||||
if err != nil {
|
||||
return client, err
|
||||
}
|
||||
client.ResourceInterface = resourceInterface
|
||||
return client, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Create(apiOp *types.APIRequest, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
obj, err := r.ResourceInterface.Create(apiOp.Context(), obj, options, subresources...)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Update(apiOp *types.APIRequest, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
obj, err := r.ResourceInterface.Update(apiOp.Context(), obj, options, subresources...)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Delete(apiOp *types.APIRequest, name string, options metav1.DeleteOptions, subresources ...string) error {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
err := r.ResourceInterface.Delete(apiOp.Context(), name, options, subresources...)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Get(apiOp *types.APIRequest, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
obj, err := r.ResourceInterface.Get(apiOp.Context(), name, options, subresources...)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) List(apiOp *types.APIRequest, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
obj, err := r.ResourceInterface.List(apiOp.Context(), opts)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Watch(apiOp *types.APIRequest, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
watchInterface, err := r.ResourceInterface.Watch(apiOp.Context(), opts)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return watchInterface, err
|
||||
}
|
||||
|
||||
func (r ResourceClientWithMetrics) Patch(apiOp *types.APIRequest, name string, pt k8stypes.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
start := time.Now()
|
||||
obj, err := r.ResourceInterface.Patch(apiOp.Context(), name, pt, data, options, subresources...)
|
||||
m.RecordK8sClientResponseTime(err, float64(time.Since(start).Milliseconds()))
|
||||
return obj, err
|
||||
}
|
66
pkg/stores/metrics/metrics_store.go
Normal file
66
pkg/stores/metrics/metrics_store.go
Normal file
@ -0,0 +1,66 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/metrics"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
Store types.Store
|
||||
}
|
||||
|
||||
func NewMetricsStore(store types.Store) *Store {
|
||||
return &Store{
|
||||
Store: store,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiObject, err := s.Store.ByID(apiOp, schema, id)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiObject, err
|
||||
}
|
||||
|
||||
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiObjectList, err := s.Store.List(apiOp, schema)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiObjectList, err
|
||||
}
|
||||
|
||||
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiObject, err := s.Store.Create(apiOp, schema, data)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiObject, err
|
||||
}
|
||||
|
||||
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiObject, err := s.Store.Update(apiOp, schema, data, id)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiObject, err
|
||||
}
|
||||
|
||||
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiObject, err := s.Store.Delete(apiOp, schema, id)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiObject, err
|
||||
}
|
||||
|
||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
||||
m := metrics.MetricLogger{Resource: apiOp.Schema.ID, Method: apiOp.Method}
|
||||
storeStart := time.Now()
|
||||
apiEvent, err := s.Store.Watch(apiOp, schema, w)
|
||||
m.RecordProxyStoreResponseTime(err, float64(time.Since(storeStart).Milliseconds()))
|
||||
return apiEvent, err
|
||||
}
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/steve/pkg/attributes"
|
||||
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
||||
"github.com/rancher/steve/pkg/stores/partition"
|
||||
"github.com/rancher/wrangler/pkg/data"
|
||||
"github.com/rancher/wrangler/pkg/schemas/validation"
|
||||
@ -118,7 +119,7 @@ func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
|
||||
}
|
||||
|
||||
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
|
||||
k8sClient, err := s.clientGetter.TableClient(apiOp, schema, namespace)
|
||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -128,7 +129,7 @@ func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj, err := k8sClient.Get(apiOp.Context(), id, opts)
|
||||
obj, err := k8sClient.Get(apiOp, id, opts)
|
||||
rowToObject(obj)
|
||||
return obj, err
|
||||
}
|
||||
@ -256,7 +257,8 @@ func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dy
|
||||
return types.APIObjectList{}, nil
|
||||
}
|
||||
|
||||
resultList, err := client.List(apiOp.Context(), opts)
|
||||
k8sClient, _ := metricsStore.Wrap(client, nil)
|
||||
resultList, err := k8sClient.List(apiOp, opts)
|
||||
if err != nil {
|
||||
return types.APIObjectList{}, err
|
||||
}
|
||||
@ -282,14 +284,15 @@ func returnErr(err error, c chan types.APIEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) {
|
||||
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) {
|
||||
rev := w.Revision
|
||||
if rev == "-1" || rev == "0" {
|
||||
rev = ""
|
||||
}
|
||||
|
||||
timeout := int64(60 * 30)
|
||||
watcher, err := k8sClient.Watch(apiOp.Context(), metav1.ListOptions{
|
||||
k8sClient, _ := metricsStore.Wrap(client, nil)
|
||||
watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{
|
||||
Watch: true,
|
||||
TimeoutSeconds: &timeout,
|
||||
ResourceVersion: rev,
|
||||
@ -427,7 +430,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
|
||||
gvk := attributes.GVK(schema)
|
||||
input["apiVersion"], input["kind"] = gvk.ToAPIVersionAndKind()
|
||||
|
||||
k8sClient, err := s.clientGetter.TableClient(apiOp, schema, ns)
|
||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
@ -437,9 +440,10 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
resp, err = k8sClient.Create(apiOp.Context(), &unstructured.Unstructured{Object: input}, opts)
|
||||
resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
|
||||
rowToObject(resp)
|
||||
return toAPI(schema, resp), err
|
||||
apiObject := toAPI(schema, resp)
|
||||
return apiObject, err
|
||||
}
|
||||
|
||||
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) {
|
||||
@ -449,7 +453,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
|
||||
)
|
||||
|
||||
ns := types.Namespace(input)
|
||||
k8sClient, err := s.clientGetter.TableClient(apiOp, schema, ns)
|
||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
@ -482,7 +486,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := k8sClient.Patch(apiOp.Context(), id, pType, bytes, opts)
|
||||
resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts)
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
@ -500,7 +504,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
resp, err := k8sClient.Update(apiOp.Context(), &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
|
||||
resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
@ -515,12 +519,12 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
|
||||
return types.APIObject{}, nil
|
||||
}
|
||||
|
||||
k8sClient, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
|
||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
|
||||
if err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
if err := k8sClient.Delete(apiOp.Context(), id, opts); err != nil {
|
||||
if err := k8sClient.Delete(apiOp, id, opts); err != nil {
|
||||
return types.APIObject{}, err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user