Merge pull request #4272 from a-robinson/metrics

Basic initial instrumentation of the apiserver
This commit is contained in:
Victor Marmol
2015-02-10 14:58:14 -08:00
4 changed files with 110 additions and 63 deletions

View File

@@ -78,7 +78,6 @@ func (a *APIInstaller) newWebService() *restful.WebService {
} }
func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error { func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error {
// Handler for standard REST verbs (GET, PUT, POST and DELETE). // Handler for standard REST verbs (GET, PUT, POST and DELETE).
restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler) restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler)
object := storage.New() object := storage.New()

View File

@@ -23,6 +23,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "path"
"strconv"
"strings" "strings"
"time" "time"
@@ -38,8 +39,40 @@ import (
"github.com/emicklei/go-restful" "github.com/emicklei/go-restful"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
) )
var (
// TODO(a-robinson): Add unit tests for the handling of these metrics once
// the upstream library supports it.
requestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apiserver_request_count",
Help: "Counter of apiserver requests broken out for each request handler, verb, API resource, and HTTP response code.",
},
[]string{"handler", "verb", "resource", "code"},
)
requestLatencies = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "apiserver_request_latencies",
Help: "Response latency summary in microseconds for each request handler and verb.",
},
[]string{"handler", "verb"},
)
)
func init() {
prometheus.MustRegister(requestCounter)
prometheus.MustRegister(requestLatencies)
}
// monitor is a helper function for each HTTP request handler to use for
// instrumenting basic request counter and latency metrics.
func monitor(handler, verb, resource string, httpCode int, reqStart time.Time) {
requestCounter.WithLabelValues(handler, verb, resource, strconv.Itoa(httpCode)).Inc()
requestLatencies.WithLabelValues(handler, verb).Observe(float64((time.Since(reqStart)) / time.Microsecond))
}
// mux is an object that can register http handlers. // mux is an object that can register http handlers.
type Mux interface { type Mux interface {
Handle(pattern string, handler http.Handler) Handle(pattern string, handler http.Handler)
@@ -126,8 +159,9 @@ func InstallValidator(mux Mux, servers func() map[string]Server) {
// TODO: document all handlers // TODO: document all handlers
// InstallSupport registers the APIServer support functions // InstallSupport registers the APIServer support functions
func InstallSupport(mux Mux, ws *restful.WebService) { func InstallSupport(mux Mux, ws *restful.WebService) {
// TODO: convert healthz to restful and remove container arg // TODO: convert healthz and metrics to restful and remove container arg
healthz.InstallHandler(mux) healthz.InstallHandler(mux)
mux.Handle("/metrics", prometheus.Handler())
// Set up a service to return the git code version. // Set up a service to return the git code version.
ws.Path("/version") ws.Path("/version")
@@ -196,25 +230,28 @@ func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w htt
w.Write(formatted.Bytes()) w.Write(formatted.Bytes())
} }
// errorJSON renders an error to the response. // errorJSON renders an error to the response. Returns the HTTP status code of the error.
func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) { func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) int {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, codec, status, w) writeJSON(status.Code, codec, status, w)
return status.Code
} }
// errorJSONFatal renders an error to the response, and if codec fails will render plaintext // errorJSONFatal renders an error to the response, and if codec fails will render plaintext.
func errorJSONFatal(err error, codec runtime.Codec, w http.ResponseWriter) { // Returns the HTTP status code of the error.
func errorJSONFatal(err error, codec runtime.Codec, w http.ResponseWriter) int {
util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := errToAPIStatus(err) status := errToAPIStatus(err)
output, err := codec.Encode(status) output, err := codec.Encode(status)
if err != nil { if err != nil {
w.WriteHeader(status.Code) w.WriteHeader(status.Code)
fmt.Fprintf(w, "%s: %s", status.Reason, status.Message) fmt.Fprintf(w, "%s: %s", status.Reason, status.Message)
return return status.Code
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status.Code) w.WriteHeader(status.Code)
w.Write(output) w.Write(output)
return status.Code
} }
// writeRawJSON writes a non-API object in JSON. // writeRawJSON writes a non-API object in JSON.

View File

@@ -18,6 +18,7 @@ package apiserver
import ( import (
"net/http" "net/http"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@@ -32,17 +33,26 @@ type RedirectHandler struct {
} }
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer func() { monitor("redirect", verb, apiResource, httpCode, reqStart) }()
requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
verb = requestInfo.Verb
resource, parts := requestInfo.Resource, requestInfo.Parts resource, parts := requestInfo.Resource, requestInfo.Parts
ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace) ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace)
// redirection requires /resource/resourceName path parts // redirection requires /resource/resourceName path parts
if len(parts) != 2 || req.Method != "GET" { if len(parts) != 2 || req.Method != "GET" {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
id := parts[1] id := parts[1]
@@ -50,13 +60,15 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource) httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
apiResource = resource
redirector, ok := storage.(Redirector) redirector, ok := storage.(Redirector)
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource) httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
errorJSON(errors.NewMethodNotSupported(resource, "redirect"), r.codec, w) httpCode = errorJSON(errors.NewMethodNotSupported(resource, "redirect"), r.codec, w)
return return
} }
@@ -64,9 +76,11 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return return
} }
w.Header().Set("Location", location) w.Header().Set("Location", location)
w.WriteHeader(http.StatusTemporaryRedirect) w.WriteHeader(http.StatusTemporaryRedirect)
httpCode = http.StatusTemporaryRedirect
} }

View File

@@ -43,19 +43,30 @@ type RESTHandler struct {
// ServeHTTP handles requests to all RESTStorage objects. // ServeHTTP handles requests to all RESTStorage objects.
func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer func() { monitor("rest", verb, apiResource, httpCode, reqStart) }()
requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err) glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err)
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
verb = requestInfo.Verb
storage, ok := h.storage[requestInfo.Resource] storage, ok := h.storage[requestInfo.Resource]
if !ok { if !ok {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
apiResource = requestInfo.Resource
h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource) httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource)
} }
// Sets the SelfLink field of the object. // Sets the SelfLink field of the object.
@@ -143,11 +154,12 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(
// POST /foo create // POST /foo create
// PUT /foo/bar update 'bar' // PUT /foo/bar update 'bar'
// DELETE /foo/bar delete 'bar' // DELETE /foo/bar delete 'bar'
// Returns 404 if the method/pattern doesn't match one of these entries // Responds with a 404 if the method/pattern doesn't match one of these entries.
// The s accepts several query parameters: // The s accepts several query parameters:
// timeout=<duration> Timeout for synchronous requests // timeout=<duration> Timeout for synchronous requests
// labels=<label-selector> Used for filtering list operations // labels=<label-selector> Used for filtering list operations
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) { // Returns the HTTP status code written to the response.
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int {
ctx := api.WithNamespace(api.NewContext(), namespace) ctx := api.WithNamespace(api.NewContext(), namespace)
// TODO: Document the timeout query parameter. // TODO: Document the timeout query parameter.
timeout := parseTimeout(req.URL.Query().Get("timeout")) timeout := parseTimeout(req.URL.Query().Get("timeout"))
@@ -157,154 +169,136 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
case 1: case 1:
label, err := labels.ParseSelector(req.URL.Query().Get("labels")) label, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
field, err := labels.ParseSelector(req.URL.Query().Get("fields")) field, err := labels.ParseSelector(req.URL.Query().Get("fields"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
lister, ok := storage.(RESTLister) lister, ok := storage.(RESTLister)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w)
return
} }
list, err := lister.List(ctx, label, field) list, err := lister.List(ctx, label, field)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
if err := h.setSelfLink(list, req); err != nil { if err := h.setSelfLink(list, req); err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
writeJSON(http.StatusOK, h.codec, list, w) writeJSON(http.StatusOK, h.codec, list, w)
case 2: case 2:
getter, ok := storage.(RESTGetter) getter, ok := storage.(RESTGetter)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w)
return
} }
item, err := getter.Get(ctx, parts[1]) item, err := getter.Get(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
if err := h.setSelfLink(item, req); err != nil { if err := h.setSelfLink(item, req); err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
writeJSON(http.StatusOK, h.codec, item, w) writeJSON(http.StatusOK, h.codec, item, w)
default: default:
notFound(w, req) notFound(w, req)
return http.StatusNotFound
} }
case "POST": case "POST":
if len(parts) != 1 { if len(parts) != 1 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
creater, ok := storage.(RESTCreater) creater, ok := storage.(RESTCreater)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
return
} }
body, err := readBody(req) body, err := readBody(req)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
obj := storage.New() obj := storage.New()
err = h.codec.DecodeInto(body, obj) err = h.codec.DecodeInto(body, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
// invoke admission control // invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE")) err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := creater.Create(ctx, obj) out, err := creater.Create(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req)) op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req))
h.finishReq(op, req, w) return h.finishReq(op, req, w)
case "DELETE": case "DELETE":
if len(parts) != 2 { if len(parts) != 2 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
deleter, ok := storage.(RESTDeleter) deleter, ok := storage.(RESTDeleter)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w)
return
} }
// invoke admission control // invoke admission control
err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE")) err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := deleter.Delete(ctx, parts[1]) out, err := deleter.Delete(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, nil) op := h.createOperation(out, timeout, nil)
h.finishReq(op, req, w) return h.finishReq(op, req, w)
case "PUT": case "PUT":
if len(parts) != 2 { if len(parts) != 2 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
updater, ok := storage.(RESTUpdater) updater, ok := storage.(RESTUpdater)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
return
} }
body, err := readBody(req) body, err := readBody(req)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
obj := storage.New() obj := storage.New()
err = h.codec.DecodeInto(body, obj) err = h.codec.DecodeInto(body, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
// invoke admission control // invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE")) err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := updater.Update(ctx, obj) out, err := updater.Update(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, curry(h.setSelfLink, req)) op := h.createOperation(out, timeout, curry(h.setSelfLink, req))
h.finishReq(op, req, w) return h.finishReq(op, req, w)
default: default:
notFound(w, req) notFound(w, req)
return http.StatusNotFound
} }
return http.StatusOK
} }
// createOperation creates an operation to process a channel response. // createOperation creates an operation to process a channel response.
@@ -316,11 +310,13 @@ func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Durati
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to receive the result and returning its ID down the writer. // Operation to receive the result and returning its ID down the writer.
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) { // Returns the HTTP status code written to the response.
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int {
result, complete := op.StatusOrResult() result, complete := op.StatusOrResult()
obj := result.Object obj := result.Object
var status int
if complete { if complete {
status := http.StatusOK status = http.StatusOK
if result.Created { if result.Created {
status = http.StatusCreated status = http.StatusCreated
} }
@@ -330,8 +326,9 @@ func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.Respons
status = stat.Code status = stat.Code
} }
} }
writeJSON(status, h.codec, obj, w)
} else { } else {
writeJSON(http.StatusAccepted, h.codec, obj, w) status = http.StatusAccepted
} }
writeJSON(status, h.codec, obj, w)
return status
} }