mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Merge pull request #48583 from smarterclayton/record_errors
Automatic merge from submit-queue Record 429 and timeout errors to prometheus Allows gathering of load being shed. Fixes #48559 @deads2k please review, there was a logic error in apiserver RequestInfo (minor, fortunately) ```release-note Requests with the query parameter `?watch=` are treated by the API server as a request to watch, but authorization and metrics were not correctly identifying those as watch requests, instead grouping them as list calls. ```
This commit is contained in:
commit
9c86d7473a
@ -60,10 +60,12 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
var httpCode int
|
var httpCode int
|
||||||
reqStart := time.Now()
|
reqStart := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.Monitor(&verb, &apiResource, &subresource,
|
metrics.Monitor(
|
||||||
|
verb, apiResource, subresource,
|
||||||
net.GetHTTPClient(req),
|
net.GetHTTPClient(req),
|
||||||
w.Header().Get("Content-Type"),
|
w.Header().Get("Content-Type"),
|
||||||
httpCode, reqStart)
|
httpCode, reqStart,
|
||||||
|
)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ctx, ok := r.Mapper.Get(req)
|
ctx, ok := r.Mapper.Get(req)
|
||||||
|
@ -69,11 +69,29 @@ func Register() {
|
|||||||
prometheus.MustRegister(requestLatenciesSummary)
|
prometheus.MustRegister(requestLatenciesSummary)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Monitor(verb, resource, subresource *string, client, contentType string, httpCode int, reqStart time.Time) {
|
// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
|
||||||
|
// uppercase to be backwards compatible with existing monitoring tooling.
|
||||||
|
func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) {
|
||||||
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
|
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
|
||||||
requestCounter.WithLabelValues(*verb, *resource, *subresource, client, contentType, codeToString(httpCode)).Inc()
|
requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc()
|
||||||
requestLatencies.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
|
requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed)
|
||||||
requestLatenciesSummary.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
|
requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
|
||||||
|
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
|
||||||
|
func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) {
|
||||||
|
reportedVerb := verb
|
||||||
|
if verb == "LIST" {
|
||||||
|
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
|
||||||
|
if values := request.URL.Query()["watch"]; len(values) > 0 {
|
||||||
|
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
|
||||||
|
reportedVerb = "WATCH"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client := cleanUserAgent(utilnet.GetHTTPClient(request))
|
||||||
|
Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Reset() {
|
func Reset() {
|
||||||
@ -103,11 +121,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R
|
|||||||
|
|
||||||
routeFunc(request, response)
|
routeFunc(request, response)
|
||||||
|
|
||||||
reportedVerb := verb
|
MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now)
|
||||||
if verb == "LIST" && strings.ToLower(request.QueryParameter("watch")) == "true" {
|
|
||||||
reportedVerb = "WATCH"
|
|
||||||
}
|
|
||||||
Monitor(&reportedVerb, &resource, &subresource, cleanUserAgent(utilnet.GetHTTPClient(request.Request)), rw.Header().Get("Content-Type"), delegate.status, now)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,13 +195,18 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er
|
|||||||
// if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch
|
// if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch
|
||||||
if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" {
|
if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" {
|
||||||
// Assumes v1.ListOptions
|
// Assumes v1.ListOptions
|
||||||
// Duplicates logic of Convert_Slice_string_To_bool
|
// Any query value that is not 0 or false is considered true
|
||||||
switch strings.ToLower(req.URL.Query().Get("watch")) {
|
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
|
||||||
case "false", "0", "":
|
if values := req.URL.Query()["watch"]; len(values) > 0 {
|
||||||
|
switch strings.ToLower(values[0]) {
|
||||||
|
case "false", "0":
|
||||||
requestInfo.Verb = "list"
|
requestInfo.Verb = "list"
|
||||||
default:
|
default:
|
||||||
requestInfo.Verb = "watch"
|
requestInfo.Verb = "watch"
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
requestInfo.Verb = "list"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection
|
// if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection
|
||||||
if len(requestInfo.Name) == 0 && requestInfo.Verb == "delete" {
|
if len(requestInfo.Name) == 0 && requestInfo.Verb == "delete" {
|
||||||
|
@ -48,6 +48,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -19,9 +19,12 @@ package filters
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/server/httplog"
|
"k8s.io/apiserver/pkg/server/httplog"
|
||||||
@ -94,6 +97,7 @@ func WithMaxInFlightLimit(
|
|||||||
defer func() { <-c }()
|
defer func() { <-c }()
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
default:
|
default:
|
||||||
|
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
|
||||||
tooManyRequests(r, w)
|
tooManyRequests(r, w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,11 +22,13 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,24 +41,28 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
|
|||||||
if longRunning == nil {
|
if longRunning == nil {
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) {
|
timeoutFunc := func(req *http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
|
||||||
// TODO unify this with apiserver.MaxInFlightLimit
|
// TODO unify this with apiserver.MaxInFlightLimit
|
||||||
ctx, ok := requestContextMapper.Get(req)
|
ctx, ok := requestContextMapper.Get(req)
|
||||||
if !ok {
|
if !ok {
|
||||||
// if this happens, the handler chain isn't setup correctly because there is no context mapper
|
// if this happens, the handler chain isn't setup correctly because there is no context mapper
|
||||||
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
|
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
|
||||||
}
|
}
|
||||||
|
|
||||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
// if this happens, the handler chain isn't setup correctly because there is no request info
|
// if this happens, the handler chain isn't setup correctly because there is no request info
|
||||||
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
|
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if longRunning(req, requestInfo) {
|
if longRunning(req, requestInfo) {
|
||||||
return nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
|
now := time.Now()
|
||||||
|
metricFn := func() {
|
||||||
|
metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now)
|
||||||
|
}
|
||||||
|
return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
|
||||||
}
|
}
|
||||||
return WithTimeout(handler, timeoutFunc)
|
return WithTimeout(handler, timeoutFunc)
|
||||||
}
|
}
|
||||||
@ -68,18 +74,19 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
|
|||||||
// provided. (If msg is empty, a suitable default message will be sent.) After
|
// provided. (If msg is empty, a suitable default message will be sent.) After
|
||||||
// the handler times out, writes by h to its http.ResponseWriter will return
|
// the handler times out, writes by h to its http.ResponseWriter will return
|
||||||
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
|
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
|
||||||
// timeout will be enforced.
|
// timeout will be enforced. recordFn is a function that will be invoked whenever
|
||||||
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler {
|
// a timeout happens.
|
||||||
|
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler {
|
||||||
return &timeoutHandler{h, timeoutFunc}
|
return &timeoutHandler{h, timeoutFunc}
|
||||||
}
|
}
|
||||||
|
|
||||||
type timeoutHandler struct {
|
type timeoutHandler struct {
|
||||||
handler http.Handler
|
handler http.Handler
|
||||||
timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError)
|
timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
after, err := t.timeout(r)
|
after, recordFn, err := t.timeout(r)
|
||||||
if after == nil {
|
if after == nil {
|
||||||
t.handler.ServeHTTP(w, r)
|
t.handler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
@ -95,6 +102,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
case <-after:
|
case <-after:
|
||||||
|
recordFn()
|
||||||
tw.timeout(err)
|
tw.timeout(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -31,12 +32,30 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type recorder struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recorder) Record() {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
r.count++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recorder) Count() int {
|
||||||
|
r.lock.Lock()
|
||||||
|
defer r.lock.Unlock()
|
||||||
|
return r.count
|
||||||
|
}
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
func TestTimeout(t *testing.T) {
|
||||||
sendResponse := make(chan struct{}, 1)
|
sendResponse := make(chan struct{}, 1)
|
||||||
writeErrors := make(chan error, 1)
|
writeErrors := make(chan error, 1)
|
||||||
timeout := make(chan time.Time, 1)
|
timeout := make(chan time.Time, 1)
|
||||||
resp := "test response"
|
resp := "test response"
|
||||||
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
|
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
|
||||||
|
record := &recorder{}
|
||||||
|
|
||||||
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
|
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
|
||||||
func(w http.ResponseWriter, r *http.Request) {
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -44,8 +63,8 @@ func TestTimeout(t *testing.T) {
|
|||||||
_, err := w.Write([]byte(resp))
|
_, err := w.Write([]byte(resp))
|
||||||
writeErrors <- err
|
writeErrors <- err
|
||||||
}),
|
}),
|
||||||
func(*http.Request) (<-chan time.Time, *apierrors.StatusError) {
|
func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
|
||||||
return timeout, timeoutErr
|
return timeout, record.Record, timeoutErr
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
@ -65,6 +84,9 @@ func TestTimeout(t *testing.T) {
|
|||||||
if err := <-writeErrors; err != nil {
|
if err := <-writeErrors; err != nil {
|
||||||
t.Errorf("got unexpected Write error on first request: %v", err)
|
t.Errorf("got unexpected Write error on first request: %v", err)
|
||||||
}
|
}
|
||||||
|
if record.Count() != 0 {
|
||||||
|
t.Errorf("invoked record method: %#v", record)
|
||||||
|
}
|
||||||
|
|
||||||
// Times out
|
// Times out
|
||||||
timeout <- time.Time{}
|
timeout <- time.Time{}
|
||||||
@ -83,6 +105,9 @@ func TestTimeout(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) {
|
if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) {
|
||||||
t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status))
|
t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status))
|
||||||
}
|
}
|
||||||
|
if record.Count() != 1 {
|
||||||
|
t.Errorf("did not invoke record method: %#v", record)
|
||||||
|
}
|
||||||
|
|
||||||
// Now try to send a response
|
// Now try to send a response
|
||||||
sendResponse <- struct{}{}
|
sendResponse <- struct{}{}
|
||||||
|
Loading…
Reference in New Issue
Block a user