Merge pull request #4299 from a-robinson/metrics

Add monitoring instrumentation for the remaining HTTP handlers in the apiserver.
This commit is contained in:
Victor Marmol 2015-02-10 16:25:08 -08:00
commit 6c15604ccb
3 changed files with 42 additions and 6 deletions

View File

@ -82,16 +82,25 @@ type ProxyHandler struct {
} }
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer func() { monitor("proxy", verb, apiResource, httpCode, reqStart) }()
requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := r.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
verb = requestInfo.Verb
namespace, resource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Parts namespace, resource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Parts
ctx := api.WithNamespace(api.NewContext(), namespace) ctx := api.WithNamespace(api.NewContext(), namespace)
if len(parts) < 2 { if len(parts) < 2 {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
id := parts[1] id := parts[1]
@ -110,13 +119,15 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' has no storage object", resource) httplog.LogOf(req, w).Addf("'%v' has no storage object", resource)
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
apiResource = resource
redirector, ok := storage.(Redirector) redirector, ok := storage.(Redirector)
if !ok { if !ok {
httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource) httplog.LogOf(req, w).Addf("'%v' is not a redirector", resource)
errorJSON(errors.NewMethodNotSupported(resource, "proxy"), r.codec, w) httpCode = errorJSON(errors.NewMethodNotSupported(resource, "proxy"), r.codec, w)
return return
} }
@ -125,11 +136,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return return
} }
if location == "" { if location == "" {
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned ''", id) httplog.LogOf(req, w).Addf("ResourceLocation for %v returned ''", id)
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
@ -137,6 +150,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return return
} }
if destURL.Scheme == "" { if destURL.Scheme == "" {
@ -151,8 +165,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
notFound(w, req) notFound(w, req)
httpCode = status.Code
return return
} }
httpCode = http.StatusOK
newReq.Header = req.Header newReq.Header = req.Header
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: destURL.Host}) proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: destURL.Host})

View File

@ -23,6 +23,7 @@ import (
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
) )
@ -72,6 +73,10 @@ type ServerStatus struct {
} }
func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var httpCode int
reqStart := time.Now()
defer func() { monitor("validate", "get", "", httpCode, reqStart) }()
reply := []ServerStatus{} reply := []ServerStatus{}
for name, server := range v.servers() { for name, server := range v.servers() {
status, msg, err := server.check(v.client) status, msg, err := server.check(v.client)
@ -85,11 +90,13 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
data, err := json.MarshalIndent(reply, "", " ") data, err := json.MarshalIndent(reply, "", " ")
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) httpCode = http.StatusInternalServerError
w.WriteHeader(httpCode)
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
return return
} }
w.WriteHeader(http.StatusOK) httpCode = http.StatusOK
w.WriteHeader(httpCode)
w.Write(data) w.Write(data)
} }

View File

@ -22,6 +22,7 @@ import (
"path" "path"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -83,39 +84,51 @@ func isWebsocketRequest(req *http.Request) bool {
// ServeHTTP processes watch requests. // ServeHTTP processes watch requests.
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var verb string
var apiResource string
var httpCode int
reqStart := time.Now()
defer func() { monitor("watch", verb, apiResource, httpCode, reqStart) }()
if req.Method != "GET" { if req.Method != "GET" {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req)
if err != nil { if err != nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
verb = requestInfo.Verb
ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace) ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace)
storage := h.storage[requestInfo.Resource] storage := h.storage[requestInfo.Resource]
if storage == nil { if storage == nil {
notFound(w, req) notFound(w, req)
httpCode = http.StatusNotFound
return return
} }
apiResource = requestInfo.Resource
watcher, ok := storage.(ResourceWatcher) watcher, ok := storage.(ResourceWatcher)
if !ok { if !ok {
errorJSON(errors.NewMethodNotSupported(requestInfo.Resource, "watch"), h.codec, w) httpCode = errorJSON(errors.NewMethodNotSupported(requestInfo.Resource, "watch"), h.codec, w)
return return
} }
label, field, resourceVersion, err := getWatchParams(req.URL.Query()) label, field, resourceVersion, err := getWatchParams(req.URL.Query())
if err != nil { if err != nil {
errorJSON(err, h.codec, w) httpCode = errorJSON(err, h.codec, w)
return return
} }
watching, err := watcher.Watch(ctx, label, field, resourceVersion) watching, err := watcher.Watch(ctx, label, field, resourceVersion)
if err != nil { if err != nil {
errorJSON(err, h.codec, w) httpCode = errorJSON(err, h.codec, w)
return return
} }
httpCode = http.StatusOK
// TODO: This is one watch per connection. We want to multiplex, so that // TODO: This is one watch per connection. We want to multiplex, so that
// multiple watches of the same thing don't create two watches downstream. // multiple watches of the same thing don't create two watches downstream.