Track gauge of all long running API requests

Allows a caller to know how many exec, log, proxy, and watch calls are
running at the current moment.
This commit is contained in:
Clayton Coleman
2017-09-19 20:35:29 -04:00
parent 10e6dc5ed3
commit fabce1b893
5 changed files with 106 additions and 45 deletions

View File

@@ -54,7 +54,6 @@ type ProxyHandler struct {
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
reqStart := time.Now()
proxyHandlerTraceID := rand.Int63()
var httpCode int
var requestInfo *request.RequestInfo
@@ -62,6 +61,9 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responseLength := 0
if rw, ok := w.(*metrics.ResponseWriterDelegator); ok {
responseLength = rw.ContentLength()
if httpCode == 0 {
httpCode = rw.Status()
}
}
metrics.Record(req, requestInfo, w.Header().Get("Content-Type"), httpCode, responseLength, time.Now().Sub(reqStart))
}()
@@ -79,18 +81,26 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
httpCode = http.StatusInternalServerError
return
}
metrics.RecordLongRunning(req, requestInfo, func() {
httpCode = r.serveHTTP(w, req, ctx, requestInfo)
})
}
// serveHTTP performs proxy handling and returns the status code of the operation.
func (r *ProxyHandler) serveHTTP(w http.ResponseWriter, req *http.Request, ctx request.Context, requestInfo *request.RequestInfo) int {
proxyHandlerTraceID := rand.Int63()
if !requestInfo.IsResourceRequest {
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
return http.StatusNotFound
}
namespace, resource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Parts
ctx = request.WithNamespace(ctx, namespace)
if len(parts) < 2 {
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
return http.StatusNotFound
}
id := parts[1]
remainder := ""
@@ -108,8 +118,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok {
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
return http.StatusNotFound
}
gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
@@ -117,21 +126,18 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
redirector, ok := storage.(rest.Redirector)
if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
httpCode = responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
return
return responsewriters.ErrorNegotiated(ctx, apierrors.NewMethodNotSupported(schema.GroupResource{Resource: resource}, "proxy"), r.Serializer, gv, w, req)
}
location, roundTripper, err := redirector.ResourceLocation(ctx, id)
if err != nil {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
httpCode = responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
return
return responsewriters.ErrorNegotiated(ctx, err, r.Serializer, gv, w, req)
}
if location == nil {
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id)
responsewriters.NotFound(w, req)
httpCode = http.StatusNotFound
return
return http.StatusNotFound
}
if roundTripper != nil {
@@ -164,7 +170,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go.
// That proxy needs to be modified to support multiple backends, not just 1.
if r.tryUpgrade(ctx, w, req, newReq, location, roundTripper, gv) {
return
return http.StatusSwitchingProtocols
}
// Redirect requests of the form "/{resource}/{name}" to "/{resource}/{name}/"
@@ -177,7 +183,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
w.Header().Set("Location", req.URL.Path+"/"+queryPart)
w.WriteHeader(http.StatusMovedPermanently)
return
return http.StatusMovedPermanently
}
start := time.Now()
@@ -209,6 +215,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
proxy.Transport = roundTripper
proxy.FlushInterval = 200 * time.Millisecond
proxy.ServeHTTP(w, newReq)
return 0
}
// tryUpgrade returns true if the request was handled.

View File

@@ -42,6 +42,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",

View File

@@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/flushwriter"
@@ -42,7 +43,10 @@ import (
func WriteObject(ctx request.Context, statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
StreamObject(ctx, statusCode, gv, s, stream, w, req)
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() {
StreamObject(ctx, statusCode, gv, s, stream, w, req)
})
return
}
WriteObjectNegotiated(ctx, s, gv, w, req, statusCode, object)

View File

@@ -46,6 +46,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utiltrace "k8s.io/apiserver/pkg/util/trace"
@@ -248,12 +249,15 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
return
}
}
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil {
scope.err(err, w, req)
return
}
handler.ServeHTTP(w, req)
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() {
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil {
scope.err(err, w, req)
return
}
handler.ServeHTTP(w, req)
})
}
}
@@ -353,7 +357,10 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
scope.err(err, w, req)
return
}
serveWatch(watcher, scope, req, w, timeout)
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() {
serveWatch(watcher, scope, req, w, timeout)
})
return
}

View File

@@ -43,6 +43,13 @@ var (
},
[]string{"verb", "resource", "subresource", "scope", "client", "contentType", "code"},
)
longRunningRequestGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "apiserver_longrunning_gauge",
Help: "Gauge of all active long-running apiserver requests broken out by verb, API resource, and scope. Not all requests are tracked this way.",
},
[]string{"verb", "resource", "subresource", "scope"},
)
requestLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "apiserver_request_latencies",
@@ -77,6 +84,7 @@ var (
// Register all metrics.
func Register() {
prometheus.MustRegister(requestCounter)
prometheus.MustRegister(longRunningRequestGauge)
prometheus.MustRegister(requestLatencies)
prometheus.MustRegister(requestLatenciesSummary)
prometheus.MustRegister(responseSizes)
@@ -86,13 +94,10 @@ func Register() {
// processing. All API paths should use InstrumentRouteFunc implicitly. Use this instead of MonitorRequest if
// you already have a RequestInfo object.
func Record(req *http.Request, requestInfo *request.RequestInfo, contentType string, code int, responseSizeInBytes int, elapsed time.Duration) {
scope := "cluster"
if requestInfo.Namespace != "" {
scope = "namespace"
}
if requestInfo.Name != "" {
scope = "resource"
if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
}
scope := cleanScope(requestInfo)
if requestInfo.IsResourceRequest {
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, contentType, scope, code, responseSizeInBytes, elapsed)
} else {
@@ -100,24 +105,30 @@ func Record(req *http.Request, requestInfo *request.RequestInfo, contentType str
}
}
// RecordLongRunning tracks the execution of a long running request against the API server. It provides an accurate count
// of the total number of open long running requests. requestInfo may be nil if the caller is not in the normal request flow.
func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn func()) {
if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
}
var g prometheus.Gauge
scope := cleanScope(requestInfo)
reportedVerb := cleanVerb(strings.ToUpper(requestInfo.Verb), req)
if requestInfo.IsResourceRequest {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.Resource, requestInfo.Subresource, scope)
} else {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", requestInfo.Path, scope)
}
g.Inc()
defer g.Dec()
fn()
}
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
func MonitorRequest(request *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, elapsed time.Duration) {
reportedVerb := verb
if verb == "LIST" {
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
if values := request.URL.Query()["watch"]; len(values) > 0 {
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
reportedVerb = "WATCH"
}
}
}
// normalize the legacy WATCHLIST to WATCH to ensure users aren't surprised by metrics
if verb == "WATCHLIST" {
reportedVerb = "WATCH"
}
client := cleanUserAgent(utilnet.GetHTTPClient(request))
func MonitorRequest(req *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, elapsed time.Duration) {
reportedVerb := cleanVerb(verb, req)
client := cleanUserAgent(utilnet.GetHTTPClient(req))
elapsedMicroseconds := float64(elapsed / time.Microsecond)
requestCounter.WithLabelValues(reportedVerb, resource, subresource, scope, client, contentType, codeToString(httpCode)).Inc()
requestLatencies.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(elapsedMicroseconds)
@@ -160,6 +171,37 @@ func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc re
})
}
func cleanScope(requestInfo *request.RequestInfo) string {
if requestInfo.Namespace != "" {
return "namespace"
}
if requestInfo.Name != "" {
return "resource"
}
if requestInfo.IsResourceRequest {
return "cluster"
}
// this is the empty scope
return ""
}
func cleanVerb(verb string, request *http.Request) string {
reportedVerb := verb
if verb == "LIST" {
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
if values := request.URL.Query()["watch"]; len(values) > 0 {
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
reportedVerb = "WATCH"
}
}
}
// normalize the legacy WATCHLIST to WATCH to ensure users aren't surprised by metrics
if verb == "WATCHLIST" {
reportedVerb = "WATCH"
}
return reportedVerb
}
func cleanUserAgent(ua string) string {
// We collapse all "web browser"-type user agents into one "browser" to reduce metric cardinality.
if strings.HasPrefix(ua, "Mozilla/") {