Collect prometheus metrics for custom resources

Since we have a custom handler for apiextensions-apiserver,
we need to record the metrics here.
This commit is contained in:
Nikhita Raghunath 2017-12-28 14:36:31 +05:30
parent b9361192b8
commit 74cd45fb21
4 changed files with 60 additions and 39 deletions

View File

@ -52,6 +52,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"path"
"strings"
"sync"
"sync/atomic"
"time"
@ -41,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
@ -187,37 +189,34 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
requestScope := crdInfo.requestScope
minRequestTimeout := 1 * time.Minute
verb := strings.ToUpper(requestInfo.Verb)
resource := requestInfo.Resource
subresource := requestInfo.Subresource
scope := metrics.CleanScope(requestInfo)
var handler http.HandlerFunc
switch requestInfo.Verb {
case "get":
handler := handlers.GetResource(storage, storage, requestScope)
handler(w, req)
return
handler = handlers.GetResource(storage, storage, requestScope)
case "list":
forceWatch := false
handler := handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
handler(w, req)
return
handler = handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
case "watch":
forceWatch := true
handler := handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
handler(w, req)
return
handler = handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
case "create":
if terminating {
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
return
}
handler := handlers.CreateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
handler(w, req)
return
handler = handlers.CreateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
case "update":
if terminating {
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
return
}
handler := handlers.UpdateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
handler(w, req)
return
handler = handlers.UpdateResource(storage, requestScope, discovery.NewUnstructuredObjectTyper(nil), r.admission)
case "patch":
if terminating {
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
@ -227,24 +226,20 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
string(types.JSONPatchType),
string(types.MergePatchType),
}
handler := handlers.PatchResource(storage, requestScope, r.admission, unstructured.UnstructuredObjectConverter{}, supportedTypes)
handler(w, req)
return
handler = handlers.PatchResource(storage, requestScope, r.admission, unstructured.UnstructuredObjectConverter{}, supportedTypes)
case "delete":
allowsOptions := true
handler := handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
handler(w, req)
return
handler = handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
case "deletecollection":
checkBody := true
handler := handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
handler(w, req)
return
handler = handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
return
}
handler = metrics.InstrumentHandlerFunc(verb, resource, subresource, scope, handler)
handler(w, req)
return
}
func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {

View File

@ -57,16 +57,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var httpCode int
var requestInfo *request.RequestInfo
defer func() {
responseLength := 0
if rw, ok := w.(*metrics.ResponseWriterDelegator); ok {
responseLength = rw.ContentLength()
if httpCode == 0 {
httpCode = rw.Status()
}
}
metrics.Record(req, requestInfo, w.Header().Get("Content-Type"), httpCode, responseLength, time.Now().Sub(reqStart))
}()
defer RecordMetrics(w, req, requestInfo, httpCode, reqStart)
ctx, ok := r.Mapper.Get(req)
if !ok {
@ -283,3 +274,14 @@ func singleJoiningSlash(a, b string) string {
}
return a + b
}
func RecordMetrics(w http.ResponseWriter, req *http.Request, requestInfo *request.RequestInfo, httpCode int, reqStart time.Time) {
responseLength := 0
if rw, ok := w.(*metrics.ResponseWriterDelegator); ok {
responseLength = rw.ContentLength()
if httpCode == 0 {
httpCode = rw.Status()
}
}
metrics.Record(req, requestInfo, w.Header().Get("Content-Type"), httpCode, responseLength, time.Now().Sub(reqStart))
}

View File

@ -128,7 +128,7 @@ func Record(req *http.Request, requestInfo *request.RequestInfo, contentType str
if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
}
scope := cleanScope(requestInfo)
scope := CleanScope(requestInfo)
if requestInfo.IsResourceRequest {
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, contentType, scope, code, responseSizeInBytes, elapsed)
} else {
@ -143,7 +143,7 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn f
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
}
var g prometheus.Gauge
scope := cleanScope(requestInfo)
scope := CleanScope(requestInfo)
reportedVerb := cleanVerb(strings.ToUpper(requestInfo.Verb), req)
if requestInfo.IsResourceRequest {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.Resource, requestInfo.Subresource, scope)
@ -178,7 +178,7 @@ func Reset() {
}
// InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps
// the go-restful RouteFunction instead of a HandlerFunc
// the go-restful RouteFunction instead of a HandlerFunc plus some Kubernetes endpoint specific information.
func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc restful.RouteFunction) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
now := time.Now()
@ -202,7 +202,30 @@ func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc re
})
}
func cleanScope(requestInfo *request.RequestInfo) string {
// InstrumentHandlerFunc works like Prometheus' InstrumentHandlerFunc but adds some Kubernetes endpoint specific information.
func InstrumentHandlerFunc(verb, resource, subresource, scope string, handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
now := time.Now()
delegate := &ResponseWriterDelegator{ResponseWriter: w}
_, cn := w.(http.CloseNotifier)
_, fl := w.(http.Flusher)
_, hj := w.(http.Hijacker)
if cn && fl && hj {
w = &fancyResponseWriterDelegator{delegate}
} else {
w = delegate
}
handler(w, req)
MonitorRequest(req, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Now().Sub(now))
}
}
// CleanScope returns the scope of the request.
func CleanScope(requestInfo *request.RequestInfo) string {
if requestInfo.Namespace != "" {
return "namespace"
}