mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #58342 from gmarek/inflight
Automatic merge from submit-queue (batch tested with PRs 55792, 58342). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add a metric to track usage of inflight request limit. This one is tricky. The goal is to know how 'loaded' given apiserver is before we start dropping the load, to so we need to somehow expose 'fullness' of channels. Sadly this metric is pretty volatile so it's not clear how to do this correctly. I decided to do pre-aggregation to smoothen the metric a bit. In the current implementation the metric publishes maximum "usage" of the inflight is previous second. If you have any ideas please share. @smarterclayton @lavalamp @wojtek-t @liggitt @deads2k @caesarxuchao @sttts @crassirostris @hulkholden ```release-note NONE ```
This commit is contained in:
commit
a73c96d7b2
@ -86,6 +86,15 @@ var (
|
||||
},
|
||||
[]string{"requestKind"},
|
||||
)
|
||||
// Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time
|
||||
// it reports maximal usage during the last second.
|
||||
currentInflightRequests = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "apiserver_current_inflight_requests",
|
||||
Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.",
|
||||
},
|
||||
[]string{"requestKind"},
|
||||
)
|
||||
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
|
||||
)
|
||||
|
||||
@ -104,6 +113,12 @@ func init() {
|
||||
prometheus.MustRegister(requestLatenciesSummary)
|
||||
prometheus.MustRegister(responseSizes)
|
||||
prometheus.MustRegister(DroppedRequests)
|
||||
prometheus.MustRegister(currentInflightRequests)
|
||||
}
|
||||
|
||||
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
|
||||
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
|
||||
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
|
||||
}
|
||||
|
||||
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
|
||||
|
@ -47,6 +47,7 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||
|
@ -19,8 +19,11 @@ package filters
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
@ -28,9 +31,16 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Constant for the retry-after interval on rate limiting.
|
||||
// TODO: maybe make this dynamic? or user-adjustable?
|
||||
const retryAfter = "1"
|
||||
const (
|
||||
// Constant for the retry-after interval on rate limiting.
|
||||
// TODO: maybe make this dynamic? or user-adjustable?
|
||||
retryAfter = "1"
|
||||
|
||||
// How often inflight usage metric should be updated. Because
|
||||
// the metrics tracks maximal value over period making this
|
||||
// longer will increase the metric value.
|
||||
inflightUsageMetricUpdatePeriod = time.Second
|
||||
)
|
||||
|
||||
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
|
||||
|
||||
@ -40,6 +50,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
|
||||
glog.Errorf(err.Error())
|
||||
}
|
||||
|
||||
// requestWatermark is used to trak maximal usage of inflight requests.
|
||||
type requestWatermark struct {
|
||||
lock sync.Mutex
|
||||
readOnlyWatermark, mutatingWatermark int
|
||||
}
|
||||
|
||||
func (w *requestWatermark) recordMutating(mutatingVal int) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
if w.mutatingWatermark < mutatingVal {
|
||||
w.mutatingWatermark = mutatingVal
|
||||
}
|
||||
}
|
||||
|
||||
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
if w.readOnlyWatermark < readOnlyVal {
|
||||
w.readOnlyWatermark = readOnlyVal
|
||||
}
|
||||
}
|
||||
|
||||
var watermark = &requestWatermark{}
|
||||
|
||||
func startRecordingUsage() {
|
||||
go func() {
|
||||
wait.Forever(func() {
|
||||
watermark.lock.Lock()
|
||||
readOnlyWatermark := watermark.readOnlyWatermark
|
||||
mutatingWatermark := watermark.mutatingWatermark
|
||||
watermark.readOnlyWatermark = 0
|
||||
watermark.mutatingWatermark = 0
|
||||
watermark.lock.Unlock()
|
||||
|
||||
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
|
||||
}, inflightUsageMetricUpdatePeriod)
|
||||
}()
|
||||
}
|
||||
|
||||
var startOnce sync.Once
|
||||
|
||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
||||
func WithMaxInFlightLimit(
|
||||
handler http.Handler,
|
||||
@ -48,6 +101,7 @@ func WithMaxInFlightLimit(
|
||||
requestContextMapper apirequest.RequestContextMapper,
|
||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||
) http.Handler {
|
||||
startOnce.Do(startRecordingUsage)
|
||||
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
||||
return handler
|
||||
}
|
||||
@ -92,7 +146,22 @@ func WithMaxInFlightLimit(
|
||||
|
||||
select {
|
||||
case c <- true:
|
||||
defer func() { <-c }()
|
||||
var mutatingLen, readOnlyLen int
|
||||
if isMutatingRequest {
|
||||
mutatingLen = len(mutatingChan)
|
||||
} else {
|
||||
readOnlyLen = len(nonMutatingChan)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-c
|
||||
if isMutatingRequest {
|
||||
watermark.recordMutating(mutatingLen)
|
||||
} else {
|
||||
watermark.recordReadOnly(readOnlyLen)
|
||||
}
|
||||
|
||||
}()
|
||||
handler.ServeHTTP(w, r)
|
||||
|
||||
default:
|
||||
|
Loading…
Reference in New Issue
Block a user