Basic initial instrumentation of the apiserver. This links in the

prometheus library for monitoring, which exports some basic resource
usage metrics by default, like number of goroutines, open file
descriptors, resident and virtual memory, etc. I've also started adding
in request counters and latency histograms, but have only added them to two
of our HTTP handlers. If this looks reasonable, I'll add them to the rest
in a second PR.
This commit is contained in:
Alex Robinson
2015-02-10 01:04:37 +00:00
parent 2c2a59568c
commit 2463cff79c
4 changed files with 130 additions and 63 deletions

View File

@@ -78,7 +78,6 @@ func (a *APIInstaller) newWebService() *restful.WebService {
} }
func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error { func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error {
// Handler for standard REST verbs (GET, PUT, POST and DELETE). // Handler for standard REST verbs (GET, PUT, POST and DELETE).
restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler) restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler)
object := storage.New() object := storage.New()

View File

@@ -38,8 +38,23 @@ import (
"github.com/emicklei/go-restful" "github.com/emicklei/go-restful"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
) )
var (
apiserverLatencies = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "apiserver_request_latencies",
Help: "Response latency summary in microseconds for each request handler, verb, and HTTP response code.",
},
[]string{"handler", "verb", "code"},
)
)
func init() {
prometheus.MustRegister(apiserverLatencies)
}
// mux is an object that can register http handlers. // mux is an object that can register http handlers.
type Mux interface { type Mux interface {
Handle(pattern string, handler http.Handler) Handle(pattern string, handler http.Handler)
@@ -126,8 +141,9 @@ func InstallValidator(mux Mux, servers func() map[string]Server) {
// TODO: document all handlers // TODO: document all handlers
// InstallSupport registers the APIServer support functions // InstallSupport registers the APIServer support functions
func InstallSupport(mux Mux, ws *restful.WebService) { func InstallSupport(mux Mux, ws *restful.WebService) {
// TODO: convert healthz to restful and remove container arg // TODO: convert healthz and metrics to restful and remove container arg
healthz.InstallHandler(mux) healthz.InstallHandler(mux)
mux.Handle("/metrics", prometheus.Handler())
// Set up a service to return the git code version. // Set up a service to return the git code version.
ws.Path("/version") ws.Path("/version")
@@ -196,25 +212,28 @@ func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w htt
w.Write(formatted.Bytes()) w.Write(formatted.Bytes())
} }
// errorJSON renders an error to the response. // errorJSON renders an error to the response. Returns the HTTP status code of the error.
func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) { func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) int {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, codec, status, w) writeJSON(status.Code, codec, status, w)
return status.Code
} }
// errorJSONFatal renders an error to the response, and if codec fails will render plaintext // errorJSONFatal renders an error to the response, and if codec fails will render plaintext.
func errorJSONFatal(err error, codec runtime.Codec, w http.ResponseWriter) { // Returns the HTTP status code of the error.
func errorJSONFatal(err error, codec runtime.Codec, w http.ResponseWriter) int {
util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := errToAPIStatus(err) status := errToAPIStatus(err)
output, err := codec.Encode(status) output, err := codec.Encode(status)
if err != nil { if err != nil {
w.WriteHeader(status.Code) w.WriteHeader(status.Code)
fmt.Fprintf(w, "%s: %s", status.Reason, status.Message) fmt.Fprintf(w, "%s: %s", status.Reason, status.Message)
return return status.Code
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status.Code) w.WriteHeader(status.Code)
w.Write(output) w.Write(output)
return status.Code
} }
// writeRawJSON writes a non-API object in JSON. // writeRawJSON writes a non-API object in JSON.

View File

@@ -18,13 +18,31 @@ package apiserver
import ( import (
"net/http" "net/http"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/prometheus/client_golang/prometheus"
) )
var (
redirectCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apiserver_redirect_count",
Help: "Counter of redirect requests broken out by apiserver resource and HTTP response code.",
},
[]string{"resource", "code"},
)
)
func init() {
prometheus.MustRegister(redirectCounter)
}
type RedirectHandler struct { type RedirectHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
codec runtime.Codec codec runtime.Codec
@@ -32,9 +50,18 @@ type RedirectHandler struct {
} }
func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
apiResource := ""
var httpCode int
reqStart := time.Now()
defer func() {
redirectCounter.WithLabelValues(apiResource, strconv.Itoa(httpCode)).Inc()
apiserverLatencies.WithLabelValues("redirect", "get", strconv.Itoa(httpCode)).Observe(float64((time.Since(reqStart)) / time.Microsecond))
}()
requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
resource, parts := requestInfo.Resource, requestInfo.Parts resource, parts := requestInfo.Resource, requestInfo.Parts
@@ -43,6 +70,7 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// redirection requires /resource/resourceName path parts // redirection requires /resource/resourceName path parts
if len(parts) != 2 || req.Method != "GET" { if len(parts) != 2 || req.Method != "GET" {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
id := parts[1] id := parts[1]
@@ -50,13 +78,16 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource) httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
notFound(w, req) notFound(w, req)
apiResource = "invalidResource"
httpCode = http.StatusNotFound
return return
} }
apiResource = resource
redirector, ok := storage.(Redirector) redirector, ok := storage.(Redirector)
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource) httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
errorJSON(errors.NewMethodNotSupported(resource, "redirect"), r.codec, w) httpCode = errorJSON(errors.NewMethodNotSupported(resource, "redirect"), r.codec, w)
return return
} }
@@ -64,9 +95,11 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return return
} }
w.Header().Set("Location", location) w.Header().Set("Location", location)
w.WriteHeader(http.StatusTemporaryRedirect) w.WriteHeader(http.StatusTemporaryRedirect)
httpCode = http.StatusTemporaryRedirect
} }

View File

@@ -19,6 +19,7 @@ package apiserver
import ( import (
"net/http" "net/http"
"path" "path"
"strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
@@ -28,8 +29,23 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
) )
var (
restCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apiserver_rest_count",
Help: "Counter of REST requests broken out for each verb, apiserver resource, and HTTP response code.",
},
[]string{"verb", "resource", "code"},
)
)
func init() {
prometheus.MustRegister(restCounter)
}
// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name. // RESTHandler implements HTTP verbs on a set of RESTful resources identified by name.
type RESTHandler struct { type RESTHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
@@ -43,18 +59,32 @@ type RESTHandler struct {
// ServeHTTP handles requests to all RESTStorage objects. // ServeHTTP handles requests to all RESTStorage objects.
func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
verb := ""
apiResource := ""
var httpCode int
reqStart := time.Now()
defer func() {
restCounter.WithLabelValues(verb, apiResource, strconv.Itoa(httpCode)).Inc()
apiserverLatencies.WithLabelValues("rest", verb, strconv.Itoa(httpCode)).Observe(float64((time.Since(reqStart)) / time.Microsecond))
}()
requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
verb = requestInfo.Verb
storage, ok := h.storage[requestInfo.Resource] storage, ok := h.storage[requestInfo.Resource]
if !ok { if !ok {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
apiResource = requestInfo.Resource
h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource) httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource)
} }
// Sets the SelfLink field of the object. // Sets the SelfLink field of the object.
@@ -142,11 +172,12 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(
// POST /foo create // POST /foo create
// PUT /foo/bar update 'bar' // PUT /foo/bar update 'bar'
// DELETE /foo/bar delete 'bar' // DELETE /foo/bar delete 'bar'
// Returns 404 if the method/pattern doesn't match one of these entries // Responds with a 404 if the method/pattern doesn't match one of these entries.
// The s accepts several query parameters: // The s accepts several query parameters:
// timeout=<duration> Timeout for synchronous requests // timeout=<duration> Timeout for synchronous requests
// labels=<label-selector> Used for filtering list operations // labels=<label-selector> Used for filtering list operations
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) { // Returns the HTTP status code written to the response.
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int {
ctx := api.WithNamespace(api.NewContext(), namespace) ctx := api.WithNamespace(api.NewContext(), namespace)
// TODO: Document the timeout query parameter. // TODO: Document the timeout query parameter.
timeout := parseTimeout(req.URL.Query().Get("timeout")) timeout := parseTimeout(req.URL.Query().Get("timeout"))
@@ -156,154 +187,136 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
case 1: case 1:
label, err := labels.ParseSelector(req.URL.Query().Get("labels")) label, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
field, err := labels.ParseSelector(req.URL.Query().Get("fields")) field, err := labels.ParseSelector(req.URL.Query().Get("fields"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
lister, ok := storage.(RESTLister) lister, ok := storage.(RESTLister)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w)
return
} }
list, err := lister.List(ctx, label, field) list, err := lister.List(ctx, label, field)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
if err := h.setSelfLink(list, req); err != nil { if err := h.setSelfLink(list, req); err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
writeJSON(http.StatusOK, h.codec, list, w) writeJSON(http.StatusOK, h.codec, list, w)
case 2: case 2:
getter, ok := storage.(RESTGetter) getter, ok := storage.(RESTGetter)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w)
return
} }
item, err := getter.Get(ctx, parts[1]) item, err := getter.Get(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
if err := h.setSelfLink(item, req); err != nil { if err := h.setSelfLink(item, req); err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
writeJSON(http.StatusOK, h.codec, item, w) writeJSON(http.StatusOK, h.codec, item, w)
default: default:
notFound(w, req) notFound(w, req)
return http.StatusNotFound
} }
case "POST": case "POST":
if len(parts) != 1 { if len(parts) != 1 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
creater, ok := storage.(RESTCreater) creater, ok := storage.(RESTCreater)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
return
} }
body, err := readBody(req) body, err := readBody(req)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
obj := storage.New() obj := storage.New()
err = h.codec.DecodeInto(body, obj) err = h.codec.DecodeInto(body, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
// invoke admission control // invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE")) err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := creater.Create(ctx, obj) out, err := creater.Create(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req)) op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req))
h.finishReq(op, req, w) return h.finishReq(op, req, w)
case "DELETE": case "DELETE":
if len(parts) != 2 { if len(parts) != 2 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
deleter, ok := storage.(RESTDeleter) deleter, ok := storage.(RESTDeleter)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w)
return
} }
// invoke admission control // invoke admission control
err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE")) err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := deleter.Delete(ctx, parts[1]) out, err := deleter.Delete(ctx, parts[1])
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, nil) op := h.createOperation(out, timeout, nil)
h.finishReq(op, req, w) return h.finishReq(op, req, w)
case "PUT": case "PUT":
if len(parts) != 2 { if len(parts) != 2 {
notFound(w, req) notFound(w, req)
return return http.StatusNotFound
} }
updater, ok := storage.(RESTUpdater) updater, ok := storage.(RESTUpdater)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w)
return
} }
body, err := readBody(req) body, err := readBody(req)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
obj := storage.New() obj := storage.New()
err = h.codec.DecodeInto(body, obj) err = h.codec.DecodeInto(body, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
// invoke admission control // invoke admission control
err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE")) err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE"))
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
out, err := updater.Update(ctx, obj) out, err := updater.Update(ctx, obj)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) return errorJSON(err, h.codec, w)
return
} }
op := h.createOperation(out, timeout, curry(h.setSelfLink, req)) op := h.createOperation(out, timeout, curry(h.setSelfLink, req))
h.finishReq(op, req, w) return h.finishReq(op, req, w)
default: default:
notFound(w, req) notFound(w, req)
return http.StatusNotFound
} }
return http.StatusOK
} }
// createOperation creates an operation to process a channel response. // createOperation creates an operation to process a channel response.
@@ -315,11 +328,13 @@ func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Durati
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to receive the result and returning its ID down the writer. // Operation to receive the result and returning its ID down the writer.
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) { // Returns the HTTP status code written to the response.
func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int {
result, complete := op.StatusOrResult() result, complete := op.StatusOrResult()
obj := result.Object obj := result.Object
var status int
if complete { if complete {
status := http.StatusOK status = http.StatusOK
if result.Created { if result.Created {
status = http.StatusCreated status = http.StatusCreated
} }
@@ -329,8 +344,9 @@ func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.Respons
status = stat.Code status = stat.Code
} }
} }
writeJSON(status, h.codec, obj, w)
} else { } else {
writeJSON(http.StatusAccepted, h.codec, obj, w) status = http.StatusAccepted
} }
writeJSON(status, h.codec, obj, w)
return status
} }