diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index a2ca78e5e4d..8d55f1deba4 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" watchjson "k8s.io/kubernetes/pkg/watch/json" @@ -456,7 +457,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } for _, action := range actions { reqScope.Namer = action.Namer - m := monitorFilter(action.Verb, resource) namespaced := "" if strings.Contains(action.Path, scope.ArgumentName()) { namespaced = "Namespaced" @@ -469,12 +469,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = GetResource(getter, exporter, reqScope) } + handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler) doc := "read the specified " + kind if hasSubresource { doc = "read " + subresource + " of the specified " + kind } route := ws.GET(action.Path).To(handler). - Filter(m). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("read"+namespaced+kind+strings.Title(subresource)). @@ -498,8 +498,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "list " + subresource + " of objects of kind " + kind } - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) + route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("list"+namespaced+kind+strings.Title(subresource)). @@ -530,8 +530,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "replace " + subresource + " of the specified " + kind } - route := ws.PUT(action.Path).To(UpdateResource(updater, reqScope, a.group.Typer, admit)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, UpdateResource(updater, reqScope, a.group.Typer, admit)) + route := ws.PUT(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("replace"+namespaced+kind+strings.Title(subresource)). @@ -546,8 +546,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "partially update " + subresource + " of the specified " + kind } - route := ws.PATCH(action.Path).To(PatchResource(patcher, reqScope, a.group.Typer, admit, mapping.ObjectConvertor)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, PatchResource(patcher, reqScope, a.group.Typer, admit, mapping.ObjectConvertor)) + route := ws.PATCH(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Consumes(string(api.JSONPatchType), string(api.MergePatchType), string(api.StrategicMergePatchType)). @@ -565,12 +565,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = CreateResource(creater, reqScope, a.group.Typer, admit) } + handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler) doc := "create a " + kind if hasSubresource { doc = "create " + subresource + " of a " + kind } route := ws.POST(action.Path).To(handler). - Filter(m). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("create"+namespaced+kind+strings.Title(subresource)). @@ -585,8 +585,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete " + subresource + " of a " + kind } - route := ws.DELETE(action.Path).To(DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) + route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("delete"+namespaced+kind+strings.Title(subresource)). @@ -603,8 +603,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete collection of " + subresource + " of a " + kind } - route := ws.DELETE(action.Path).To(DeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, DeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) + route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("deletecollection"+namespaced+kind+strings.Title(subresource)). @@ -622,8 +622,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch changes to " + subresource + " of an object of kind " + kind } - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)). @@ -641,8 +641,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch individual changes to a list of " + subresource + " of " + kind } - route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)). - Filter(m). + handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)+"List"). @@ -668,9 +668,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "connect " + method + " requests to " + subresource + " of " + kind } + handler := metrics.InstrumentRouteFunc(action.Verb, resource, ConnectResource(connecter, reqScope, admit, path)) route := ws.Method(method).Path(action.Path). - To(ConnectResource(connecter, reqScope, admit, path)). - Filter(m). + To(handler). Doc(doc). Operation("connect" + strings.Title(strings.ToLower(method)) + namespaced + kind + strings.Title(subresource)). Produces("*/*"). @@ -856,8 +856,8 @@ func addProxyRoute(ws *restful.WebService, method string, prefix string, path st if hasSubresource { doc = "proxy " + method + " requests to " + subresource + " of " + kind } - proxyRoute := ws.Method(method).Path(path).To(routeFunction(proxyHandler)). - Filter(monitorFilter("PROXY", resource)). + handler := metrics.InstrumentRouteFunc("PROXY", resource, routeFunction(proxyHandler)) + proxyRoute := ws.Method(method).Path(path).To(handler). Doc(doc). Operation("proxy" + strings.Title(method) + namespaced + kind + strings.Title(subresource)). Produces("*/*"). diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 79ddfb00664..8700e106043 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -40,7 +40,6 @@ import ( "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/flushwriter" - utilnet "k8s.io/kubernetes/pkg/util/net" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/version" @@ -54,16 +53,6 @@ func init() { metrics.Register() } -// monitorFilter creates a filter that reports the metrics for a given resource and action. -func monitorFilter(action, resource string) restful.FilterFunction { - return func(req *restful.Request, res *restful.Response, chain *restful.FilterChain) { - reqStart := time.Now() - chain.ProcessFilter(req, res) - httpCode := res.StatusCode() - metrics.Monitor(&action, &resource, utilnet.GetHTTPClient(req.Request), &httpCode, reqStart) - } -} - // mux is an object that can register http handlers. type Mux interface { Handle(pattern string, handler http.Handler) diff --git a/pkg/apiserver/metrics/metrics.go b/pkg/apiserver/metrics/metrics.go index 5c6c4797416..ec7497c4baa 100644 --- a/pkg/apiserver/metrics/metrics.go +++ b/pkg/apiserver/metrics/metrics.go @@ -17,9 +17,15 @@ limitations under the License. package metrics import ( + "bufio" + "net" + "net/http" "strconv" "time" + utilnet "k8s.io/kubernetes/pkg/util/net" + + "github.com/emicklei/go-restful" "github.com/prometheus/client_golang/prometheus" ) @@ -61,7 +67,7 @@ func Register() { } func Monitor(verb, resource *string, client string, httpCode *int, reqStart time.Time) { - requestCounter.WithLabelValues(*verb, *resource, client, strconv.Itoa(*httpCode)).Inc() + requestCounter.WithLabelValues(*verb, *resource, client, codeToString(*httpCode)).Inc() requestLatencies.WithLabelValues(*verb, *resource).Observe(float64((time.Since(reqStart)) / time.Microsecond)) requestLatenciesSummary.WithLabelValues(*verb, *resource).Observe(float64((time.Since(reqStart)) / time.Microsecond)) } @@ -71,3 +77,172 @@ func Reset() { requestLatencies.Reset() requestLatenciesSummary.Reset() } + +// InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps +// the go-restful RouteFunction instead of a HandlerFunc +func InstrumentRouteFunc(verb, resource string, routeFunc restful.RouteFunction) restful.RouteFunction { + return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { + now := time.Now() + + delegate := &responseWriterDelegator{ResponseWriter: response.ResponseWriter} + + _, cn := response.ResponseWriter.(http.CloseNotifier) + _, fl := response.ResponseWriter.(http.Flusher) + _, hj := response.ResponseWriter.(http.Hijacker) + var rw http.ResponseWriter + if cn && fl && hj { + rw = &fancyResponseWriterDelegator{delegate} + } else { + rw = delegate + } + response.ResponseWriter = rw + + routeFunc(request, response) + + elapsed := float64(time.Since(now)) / float64(time.Microsecond) + requestCounter.WithLabelValues(verb, resource, utilnet.GetHTTPClient(request.Request), codeToString(delegate.status)).Inc() + requestLatencies.WithLabelValues(verb, resource).Observe(elapsed) + requestLatenciesSummary.WithLabelValues(verb, resource).Observe(elapsed) + }) +} + +type responseWriterDelegator struct { + http.ResponseWriter + + status int + written int64 + wroteHeader bool +} + +func (r *responseWriterDelegator) WriteHeader(code int) { + r.status = code + r.wroteHeader = true + r.ResponseWriter.WriteHeader(code) +} + +func (r *responseWriterDelegator) Write(b []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + n, err := r.ResponseWriter.Write(b) + r.written += int64(n) + return n, err +} + +type fancyResponseWriterDelegator struct { + *responseWriterDelegator +} + +func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool { + return f.ResponseWriter.(http.CloseNotifier).CloseNotify() +} + +func (f *fancyResponseWriterDelegator) Flush() { + f.ResponseWriter.(http.Flusher).Flush() +} + +func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return f.ResponseWriter.(http.Hijacker).Hijack() +} + +// Small optimization over Itoa +func codeToString(s int) string { + switch s { + case 100: + return "100" + case 101: + return "101" + + case 200: + return "200" + case 201: + return "201" + case 202: + return "202" + case 203: + return "203" + case 204: + return "204" + case 205: + return "205" + case 206: + return "206" + + case 300: + return "300" + case 301: + return "301" + case 302: + return "302" + case 304: + return "304" + case 305: + return "305" + case 307: + return "307" + + case 400: + return "400" + case 401: + return "401" + case 402: + return "402" + case 403: + return "403" + case 404: + return "404" + case 405: + return "405" + case 406: + return "406" + case 407: + return "407" + case 408: + return "408" + case 409: + return "409" + case 410: + return "410" + case 411: + return "411" + case 412: + return "412" + case 413: + return "413" + case 414: + return "414" + case 415: + return "415" + case 416: + return "416" + case 417: + return "417" + case 418: + return "418" + + case 500: + return "500" + case 501: + return "501" + case 502: + return "502" + case 503: + return "503" + case 504: + return "504" + case 505: + return "505" + + case 428: + return "428" + case 429: + return "429" + case 431: + return "431" + case 511: + return "511" + + default: + return strconv.Itoa(s) + } +}