mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 05:21:58 +00:00
Add a metric to track usage of inflight request limit.
This commit is contained in:
parent
dd272ea3fd
commit
000d7bac29
@ -86,6 +86,15 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{"requestKind"},
|
[]string{"requestKind"},
|
||||||
)
|
)
|
||||||
|
// Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time
|
||||||
|
// it reports maximal usage during the last second.
|
||||||
|
currentInflightRequests = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "apiserver_current_inflight_requests",
|
||||||
|
Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.",
|
||||||
|
},
|
||||||
|
[]string{"requestKind"},
|
||||||
|
)
|
||||||
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
|
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -104,6 +113,12 @@ func init() {
|
|||||||
prometheus.MustRegister(requestLatenciesSummary)
|
prometheus.MustRegister(requestLatenciesSummary)
|
||||||
prometheus.MustRegister(responseSizes)
|
prometheus.MustRegister(responseSizes)
|
||||||
prometheus.MustRegister(DroppedRequests)
|
prometheus.MustRegister(DroppedRequests)
|
||||||
|
prometheus.MustRegister(currentInflightRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
|
||||||
|
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
|
||||||
|
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
|
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
|
||||||
|
@ -47,6 +47,7 @@ go_library(
|
|||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||||
|
@ -19,8 +19,11 @@ package filters
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
@ -28,9 +31,16 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Constant for the retry-after interval on rate limiting.
|
const (
|
||||||
// TODO: maybe make this dynamic? or user-adjustable?
|
// Constant for the retry-after interval on rate limiting.
|
||||||
const retryAfter = "1"
|
// TODO: maybe make this dynamic? or user-adjustable?
|
||||||
|
retryAfter = "1"
|
||||||
|
|
||||||
|
// How often inflight usage metric should be updated. Because
|
||||||
|
// the metrics tracks maximal value over period making this
|
||||||
|
// longer will increase the metric value.
|
||||||
|
inflightUsageMetricUpdatePeriod = time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
|
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
|
||||||
|
|
||||||
@ -40,6 +50,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
|
|||||||
glog.Errorf(err.Error())
|
glog.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// requestWatermark is used to trak maximal usage of inflight requests.
|
||||||
|
type requestWatermark struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
readOnlyWatermark, mutatingWatermark int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *requestWatermark) recordMutating(mutatingVal int) {
|
||||||
|
w.lock.Lock()
|
||||||
|
defer w.lock.Unlock()
|
||||||
|
|
||||||
|
if w.mutatingWatermark < mutatingVal {
|
||||||
|
w.mutatingWatermark = mutatingVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
|
||||||
|
w.lock.Lock()
|
||||||
|
defer w.lock.Unlock()
|
||||||
|
|
||||||
|
if w.readOnlyWatermark < readOnlyVal {
|
||||||
|
w.readOnlyWatermark = readOnlyVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var watermark = &requestWatermark{}
|
||||||
|
|
||||||
|
func startRecordingUsage() {
|
||||||
|
go func() {
|
||||||
|
wait.Forever(func() {
|
||||||
|
watermark.lock.Lock()
|
||||||
|
readOnlyWatermark := watermark.readOnlyWatermark
|
||||||
|
mutatingWatermark := watermark.mutatingWatermark
|
||||||
|
watermark.readOnlyWatermark = 0
|
||||||
|
watermark.mutatingWatermark = 0
|
||||||
|
watermark.lock.Unlock()
|
||||||
|
|
||||||
|
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
|
||||||
|
}, inflightUsageMetricUpdatePeriod)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var startOnce sync.Once
|
||||||
|
|
||||||
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
|
||||||
func WithMaxInFlightLimit(
|
func WithMaxInFlightLimit(
|
||||||
handler http.Handler,
|
handler http.Handler,
|
||||||
@ -48,6 +101,7 @@ func WithMaxInFlightLimit(
|
|||||||
requestContextMapper apirequest.RequestContextMapper,
|
requestContextMapper apirequest.RequestContextMapper,
|
||||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||||
) http.Handler {
|
) http.Handler {
|
||||||
|
startOnce.Do(startRecordingUsage)
|
||||||
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
@ -92,7 +146,22 @@ func WithMaxInFlightLimit(
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case c <- true:
|
case c <- true:
|
||||||
defer func() { <-c }()
|
var mutatingLen, readOnlyLen int
|
||||||
|
if isMutatingRequest {
|
||||||
|
mutatingLen = len(mutatingChan)
|
||||||
|
} else {
|
||||||
|
readOnlyLen = len(nonMutatingChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
<-c
|
||||||
|
if isMutatingRequest {
|
||||||
|
watermark.recordMutating(mutatingLen)
|
||||||
|
} else {
|
||||||
|
watermark.recordReadOnly(readOnlyLen)
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
Loading…
Reference in New Issue
Block a user