From 000d7bac29b9239a29531a526d382394d8d60353 Mon Sep 17 00:00:00 2001 From: Marek Grabowski Date: Tue, 16 Jan 2018 15:48:20 +0000 Subject: [PATCH] Add a metric to track usage of inflight request limit. --- .../pkg/endpoints/metrics/metrics.go | 15 ++++ .../k8s.io/apiserver/pkg/server/filters/BUILD | 1 + .../pkg/server/filters/maxinflight.go | 77 ++++++++++++++++++- 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index ec75fad3d12..c930fa278c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -86,6 +86,15 @@ var ( }, []string{"requestKind"}, ) + // Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time + // it reports maximal usage during the last second. + currentInflightRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "apiserver_current_inflight_requests", + Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.", + }, + []string{"requestKind"}, + ) kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`) ) @@ -104,6 +113,12 @@ func init() { prometheus.MustRegister(requestLatenciesSummary) prometheus.MustRegister(responseSizes) prometheus.MustRegister(DroppedRequests) + prometheus.MustRegister(currentInflightRequests) +} + +func UpdateInflightRequestMetrics(nonmutating, mutating int) { + currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating)) + currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating)) } // Record records a single request to the standard metrics endpoints. For use by handlers that perform their own diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 69272f623e1..8fed2379787 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -47,6 +47,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 1eb7292a4f3..84fa31c72e3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -19,8 +19,11 @@ package filters import ( "fmt" "net/http" + "sync" + "time" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -28,9 +31,16 @@ import ( "github.com/golang/glog" ) -// Constant for the retry-after interval on rate limiting. -// TODO: maybe make this dynamic? or user-adjustable? -const retryAfter = "1" +const ( + // Constant for the retry-after interval on rate limiting. + // TODO: maybe make this dynamic? or user-adjustable? + retryAfter = "1" + + // How often inflight usage metric should be updated. Because + // the metrics tracks maximal value over period making this + // longer will increase the metric value. + inflightUsageMetricUpdatePeriod = time.Second +) var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") @@ -40,6 +50,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { glog.Errorf(err.Error()) } +// requestWatermark is used to trak maximal usage of inflight requests. +type requestWatermark struct { + lock sync.Mutex + readOnlyWatermark, mutatingWatermark int +} + +func (w *requestWatermark) recordMutating(mutatingVal int) { + w.lock.Lock() + defer w.lock.Unlock() + + if w.mutatingWatermark < mutatingVal { + w.mutatingWatermark = mutatingVal + } +} + +func (w *requestWatermark) recordReadOnly(readOnlyVal int) { + w.lock.Lock() + defer w.lock.Unlock() + + if w.readOnlyWatermark < readOnlyVal { + w.readOnlyWatermark = readOnlyVal + } +} + +var watermark = &requestWatermark{} + +func startRecordingUsage() { + go func() { + wait.Forever(func() { + watermark.lock.Lock() + readOnlyWatermark := watermark.readOnlyWatermark + mutatingWatermark := watermark.mutatingWatermark + watermark.readOnlyWatermark = 0 + watermark.mutatingWatermark = 0 + watermark.lock.Unlock() + + metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark) + }, inflightUsageMetricUpdatePeriod) + }() +} + +var startOnce sync.Once + // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. func WithMaxInFlightLimit( handler http.Handler, @@ -48,6 +101,7 @@ func WithMaxInFlightLimit( requestContextMapper apirequest.RequestContextMapper, longRunningRequestCheck apirequest.LongRunningRequestCheck, ) http.Handler { + startOnce.Do(startRecordingUsage) if nonMutatingLimit == 0 && mutatingLimit == 0 { return handler } @@ -92,7 +146,22 @@ func WithMaxInFlightLimit( select { case c <- true: - defer func() { <-c }() + var mutatingLen, readOnlyLen int + if isMutatingRequest { + mutatingLen = len(mutatingChan) + } else { + readOnlyLen = len(nonMutatingChan) + } + + defer func() { + <-c + if isMutatingRequest { + watermark.recordMutating(mutatingLen) + } else { + watermark.recordReadOnly(readOnlyLen) + } + + }() handler.ServeHTTP(w, r) default: