mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #88714 from MikeSpreitzer/apf-finer-metrics2
Extend API Priority and Fairness metrics
This commit is contained in:
commit
d90b37f16e
@ -648,7 +648,7 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
|||||||
}
|
}
|
||||||
startWaitingTime = time.Now()
|
startWaitingTime = time.Now()
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
||||||
req, idle := plState.queues.StartRequest(ctx, hashValue, rd.RequestInfo, rd.User)
|
req, idle := plState.queues.StartRequest(ctx, hashValue, fs.Name, rd.RequestInfo, rd.User)
|
||||||
if idle {
|
if idle {
|
||||||
cfgCtl.maybeReapLocked(plName, plState)
|
cfgCtl.maybeReapLocked(plName, plState)
|
||||||
}
|
}
|
||||||
|
@ -21,13 +21,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
// TODO: decide whether to use the existing metrics, which
|
|
||||||
// categorize according to mutating vs readonly, or make new
|
|
||||||
// metrics because this filter does not pay attention to that
|
|
||||||
// distinction
|
|
||||||
|
|
||||||
// "k8s.io/apiserver/pkg/endpoints/metrics"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
@ -109,6 +102,7 @@ func (cfgCtl *configController) Handle(ctx context.Context, requestDigest Reques
|
|||||||
if queued {
|
if queued {
|
||||||
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
}
|
}
|
||||||
|
metrics.AddDispatch(pl.Name, fs.Name)
|
||||||
executed = true
|
executed = true
|
||||||
startExecutionTime := time.Now()
|
startExecutionTime := time.Now()
|
||||||
execFn()
|
execFn()
|
||||||
|
@ -115,11 +115,11 @@ func (cqs *ctlTestQueueSet) IsIdle() bool {
|
|||||||
return cqs.countActive == 0
|
return cqs.countActive == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
||||||
cqs.cts.lock.Lock()
|
cqs.cts.lock.Lock()
|
||||||
defer cqs.cts.lock.Unlock()
|
defer cqs.cts.lock.Unlock()
|
||||||
cqs.countActive++
|
cqs.countActive++
|
||||||
cqs.cts.t.Logf("Queued %#+v %#+v for %p QS=%s, countActive:=%d", descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
|
cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
|
||||||
return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
|
return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ type QueueSet interface {
|
|||||||
// returned bool indicates whether the QueueSet was idle at the
|
// returned bool indicates whether the QueueSet was idle at the
|
||||||
// moment of the return. Otherwise idle==false and the client
|
// moment of the return. Otherwise idle==false and the client
|
||||||
// must call the Wait method of the Request exactly once.
|
// must call the Wait method of the Request exactly once.
|
||||||
StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (req Request, idle bool)
|
StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request represents the remainder of the handling of one request
|
// Request represents the remainder of the handling of one request
|
||||||
|
@ -33,6 +33,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -218,10 +218,10 @@ const (
|
|||||||
|
|
||||||
// StartRequest begins the process of handling a request. We take the
|
// StartRequest begins the process of handling a request. We take the
|
||||||
// approach of updating the metrics about total requests queued and
|
// approach of updating the metrics about total requests queued and
|
||||||
// executing on each path out of this method and Request::Wait. We do
|
// executing at each point where there is a change in that quantity,
|
||||||
// not update those metrics in lower level functions because there can
|
// because the metrics --- and only the metrics --- track that
|
||||||
// be multiple lower level changes in one invocation here.
|
// quantity per FlowSchema.
|
||||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) {
|
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var req *request
|
var req *request
|
||||||
@ -231,11 +231,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
|
|||||||
// Apply only concurrency limit, if zero queues desired
|
// Apply only concurrency limit, if zero queues desired
|
||||||
if qs.qCfg.DesiredNumQueues < 1 {
|
if qs.qCfg.DesiredNumQueues < 1 {
|
||||||
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
||||||
|
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
req = qs.dispatchSansQueueLocked(ctx, descr1, descr2)
|
req = qs.dispatchSansQueueLocked(ctx, fsName, descr1, descr2)
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
|
||||||
return req, false
|
return req, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,13 +246,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
|
|||||||
// 3) Reject current request if there is not enough concurrency shares and
|
// 3) Reject current request if there is not enough concurrency shares and
|
||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 4) If not rejected, create a request and enqueue
|
||||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, descr1, descr2)
|
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, fsName, descr1, descr2)
|
||||||
defer metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
|
||||||
// req == nil means that the request was rejected - no remaining
|
// req == nil means that the request was rejected - no remaining
|
||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
|
||||||
metrics.AddReject(qs.qCfg.Name, "queue-full")
|
metrics.AddReject(qs.qCfg.Name, fsName, "queue-full")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,7 +265,6 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
|
|||||||
// fair queuing technique to pick a queue and dispatch a
|
// fair queuing technique to pick a queue and dispatch a
|
||||||
// request from that queue.
|
// request from that queue.
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
defer metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
|
||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
// Step 3:
|
// Step 3:
|
||||||
@ -288,7 +286,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
|
|||||||
// known that the count does not need to be accurate.
|
// known that the count does not need to be accurate.
|
||||||
// BTW, the count only needs to be accurate in a test that
|
// BTW, the count only needs to be accurate in a test that
|
||||||
// uses FakeEventClock::Run().
|
// uses FakeEventClock::Run().
|
||||||
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2)
|
klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", qs.qCfg.Name, fsName, descr1, descr2)
|
||||||
qs.cancelWait(req)
|
qs.cancelWait(req)
|
||||||
qs.goroutineDoneOrBlocked()
|
qs.goroutineDoneOrBlocked()
|
||||||
}()
|
}()
|
||||||
@ -329,7 +327,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
switch decision {
|
switch decision {
|
||||||
case decisionReject:
|
case decisionReject:
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
|
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
|
||||||
metrics.AddReject(qs.qCfg.Name, "time-out")
|
metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out")
|
||||||
return false, qs.isIdleLocked()
|
return false, qs.isIdleLocked()
|
||||||
case decisionCancel:
|
case decisionCancel:
|
||||||
// TODO(aaron-prindle) add metrics for this case
|
// TODO(aaron-prindle) add metrics for this case
|
||||||
@ -400,12 +398,12 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
// returns the enqueud request on a successful enqueue
|
// returns the enqueud request on a successful enqueue
|
||||||
// returns nil in the case that there is no available concurrency or
|
// returns nil in the case that there is no available concurrency or
|
||||||
// the queuelengthlimit has been reached
|
// the queuelengthlimit has been reached
|
||||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) *request {
|
||||||
// Start with the shuffle sharding, to pick a queue.
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
// The next step is the logic to reject requests that have been waiting too long
|
// The next step is the logic to reject requests that have been waiting too long
|
||||||
qs.removeTimedOutRequestsFromQueueLocked(queue)
|
qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
|
||||||
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
||||||
// requests that are in the queue longer than the timeout if there are no new requests
|
// requests that are in the queue longer than the timeout if there are no new requests
|
||||||
// We prefer the simplicity over the promptness, at least for now.
|
// We prefer the simplicity over the promptness, at least for now.
|
||||||
@ -413,6 +411,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
// Create a request and enqueue
|
// Create a request and enqueue
|
||||||
req := &request{
|
req := &request{
|
||||||
qs: qs,
|
qs: qs,
|
||||||
|
fsName: fsName,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||||
arrivalTime: qs.clock.Now(),
|
arrivalTime: qs.clock.Now(),
|
||||||
@ -423,7 +422,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests))
|
metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests))
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -446,7 +445,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
|
|
||||||
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
||||||
// past the requestWaitLimit
|
// past the requestWaitLimit
|
||||||
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
|
||||||
timeoutIdx := -1
|
timeoutIdx := -1
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
reqs := queue.requests
|
reqs := queue.requests
|
||||||
@ -461,6 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
|||||||
req.decision.SetLocked(decisionReject)
|
req.decision.SetLocked(decisionReject)
|
||||||
// get index for timed out requests
|
// get index for timed out requests
|
||||||
timeoutIdx = i
|
timeoutIdx = i
|
||||||
|
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -505,6 +505,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
}
|
}
|
||||||
queue.Enqueue(request)
|
queue.Enqueue(request)
|
||||||
qs.totRequestsWaiting++
|
qs.totRequestsWaiting++
|
||||||
|
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
||||||
@ -522,10 +523,11 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, fsName string, descr1, descr2 interface{}) *request {
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
req := &request{
|
req := &request{
|
||||||
qs: qs,
|
qs: qs,
|
||||||
|
fsName: fsName,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
startTime: now,
|
startTime: now,
|
||||||
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
|
||||||
@ -535,8 +537,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2
|
|||||||
}
|
}
|
||||||
req.decision.SetLocked(decisionExecute)
|
req.decision.SetLocked(decisionExecute)
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
|
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1)
|
||||||
if klog.V(5) {
|
if klog.V(5) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
@ -563,6 +566,8 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
queue.requestsExecuting++
|
queue.requestsExecuting++
|
||||||
|
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1)
|
||||||
|
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1)
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
@ -590,6 +595,7 @@ func (qs *queueSet) cancelWait(req *request) {
|
|||||||
// remove the request
|
// remove the request
|
||||||
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
|
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -634,8 +640,6 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
|||||||
|
|
||||||
qs.finishRequestLocked(req)
|
qs.finishRequestLocked(req)
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting)
|
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting)
|
|
||||||
return qs.isIdleLocked()
|
return qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -644,6 +648,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
|||||||
// callback updates important state in the queueSet
|
// callback updates important state in the queueSet
|
||||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
|
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1)
|
||||||
|
|
||||||
if r.queue == nil {
|
if r.queue == nil {
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
|
@ -18,6 +18,7 @@ package queueset
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@ -27,6 +28,7 @@ import (
|
|||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -52,24 +54,36 @@ type uniformClient struct {
|
|||||||
// expectPass indicates whether the QueueSet is expected to be fair.
|
// expectPass indicates whether the QueueSet is expected to be fair.
|
||||||
// expectedAllRequests indicates whether all requests are expected to get dispatched.
|
// expectedAllRequests indicates whether all requests are expected to get dispatched.
|
||||||
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
|
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
|
||||||
evalDuration time.Duration, expectPass bool, expectedAllRequests bool,
|
evalDuration time.Duration,
|
||||||
|
expectPass, expectedAllRequests, expectInqueueMetrics, expectExecutingMetrics bool,
|
||||||
|
rejectReason string,
|
||||||
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
|
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter)
|
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter)
|
||||||
integrators := make([]test.Integrator, len(sc))
|
integrators := make([]test.Integrator, len(sc))
|
||||||
var failedCount uint64
|
var failedCount uint64
|
||||||
|
expectedInqueue := ""
|
||||||
|
expectedExecuting := ""
|
||||||
|
if expectInqueueMetrics || expectExecutingMetrics {
|
||||||
|
metrics.Reset()
|
||||||
|
}
|
||||||
|
executions := make([]int32, len(sc))
|
||||||
|
rejects := make([]int32, len(sc))
|
||||||
for i, uc := range sc {
|
for i, uc := range sc {
|
||||||
integrators[i] = test.NewIntegrator(clk)
|
integrators[i] = test.NewIntegrator(clk)
|
||||||
|
fsName := fmt.Sprintf("client%d", i)
|
||||||
|
expectedInqueue = expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n")
|
||||||
for j := 0; j < uc.nThreads; j++ {
|
for j := 0; j < uc.nThreads; j++ {
|
||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
||||||
for k := 0; k < uc.nCalls; k++ {
|
for k := 0; k < uc.nCalls; k++ {
|
||||||
ClockWait(clk, counter, uc.thinkDuration)
|
ClockWait(clk, counter, uc.thinkDuration)
|
||||||
req, idle := qs.StartRequest(context.Background(), uc.hash, name, []int{i, j, k})
|
req, idle := qs.StartRequest(context.Background(), uc.hash, fsName, name, []int{i, j, k})
|
||||||
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
|
t.Logf("%s: %d, %d, %d got req=%p, idle=%v", clk.Now().Format(nsTimeFmt), i, j, k, req, idle)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
atomic.AddUint64(&failedCount, 1)
|
atomic.AddUint64(&failedCount, 1)
|
||||||
|
atomic.AddInt32(&rejects[i], 1)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if idle {
|
if idle {
|
||||||
@ -79,6 +93,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
|
|||||||
idle2 := req.Finish(func() {
|
idle2 := req.Finish(func() {
|
||||||
executed = true
|
executed = true
|
||||||
t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k)
|
t.Logf("%s: %d, %d, %d executing", clk.Now().Format(nsTimeFmt), i, j, k)
|
||||||
|
atomic.AddInt32(&executions[i], 1)
|
||||||
igr.Add(1)
|
igr.Add(1)
|
||||||
ClockWait(clk, counter, uc.execDuration)
|
ClockWait(clk, counter, uc.execDuration)
|
||||||
igr.Add(-1)
|
igr.Add(-1)
|
||||||
@ -86,6 +101,7 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
|
|||||||
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
|
t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", clk.Now().Format(nsTimeFmt), i, j, k, executed, idle2)
|
||||||
if !executed {
|
if !executed {
|
||||||
atomic.AddUint64(&failedCount, 1)
|
atomic.AddUint64(&failedCount, 1)
|
||||||
|
atomic.AddInt32(&rejects[i], 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
counter.Add(-1)
|
counter.Add(-1)
|
||||||
@ -124,6 +140,52 @@ func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet,
|
|||||||
} else if !expectedAllRequests && failedCount == 0 {
|
} else if !expectedAllRequests && failedCount == 0 {
|
||||||
t.Errorf("Expected failed requests but all requests succeeded")
|
t.Errorf("Expected failed requests but all requests succeeded")
|
||||||
}
|
}
|
||||||
|
if expectInqueueMetrics {
|
||||||
|
e := `
|
||||||
|
# HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system
|
||||||
|
# TYPE apiserver_flowcontrol_current_inqueue_requests gauge
|
||||||
|
` + expectedInqueue
|
||||||
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else {
|
||||||
|
t.Log("Success with" + e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expectedRejects := ""
|
||||||
|
for i := range sc {
|
||||||
|
fsName := fmt.Sprintf("client%d", i)
|
||||||
|
if atomic.AddInt32(&executions[i], 0) > 0 {
|
||||||
|
expectedExecuting = expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, name, "\n")
|
||||||
|
}
|
||||||
|
if atomic.AddInt32(&rejects[i], 0) > 0 {
|
||||||
|
expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flowSchema=%q,priorityLevel=%q,reason=%q} %d%s`, fsName, name, rejectReason, rejects[i], "\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if expectExecutingMetrics && len(expectedExecuting) > 0 {
|
||||||
|
e := `
|
||||||
|
# HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system
|
||||||
|
# TYPE apiserver_flowcontrol_current_executing_requests gauge
|
||||||
|
` + expectedExecuting
|
||||||
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else {
|
||||||
|
t.Log("Success with" + e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if expectExecutingMetrics && len(expectedRejects) > 0 {
|
||||||
|
e := `
|
||||||
|
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
|
||||||
|
# TYPE apiserver_flowcontrol_rejected_requests_total counter
|
||||||
|
` + expectedRejects
|
||||||
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else {
|
||||||
|
t.Log("Success with" + e)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||||
@ -144,6 +206,7 @@ func init() {
|
|||||||
|
|
||||||
// TestNoRestraint should fail because the dummy QueueSet exercises no control
|
// TestNoRestraint should fail because the dummy QueueSet exercises no control
|
||||||
func TestNoRestraint(t *testing.T) {
|
func TestNoRestraint(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{})
|
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{})
|
||||||
@ -154,10 +217,11 @@ func TestNoRestraint(t *testing.T) {
|
|||||||
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
|
||||||
{1001001001, 5, 10, time.Second, time.Second},
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
{2002002002, 2, 10, time.Second, time.Second / 2},
|
{2002002002, 2, 10, time.Second, time.Second / 2},
|
||||||
}, time.Second*10, false, true, clk, counter)
|
}, time.Second*10, false, true, false, false, "", clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUniformFlows(t *testing.T) {
|
func TestUniformFlows(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
@ -175,13 +239,14 @@ func TestUniformFlows(t *testing.T) {
|
|||||||
}
|
}
|
||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
|
||||||
{1001001001, 5, 10, time.Second, time.Second},
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
{2002002002, 5, 10, time.Second, time.Second},
|
{2002002002, 5, 10, time.Second, time.Second},
|
||||||
}, time.Second*20, true, true, clk, counter)
|
}, time.Second*20, true, true, true, true, "", clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDifferentFlows(t *testing.T) {
|
func TestDifferentFlows(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
@ -199,13 +264,14 @@ func TestDifferentFlows(t *testing.T) {
|
|||||||
}
|
}
|
||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, time.Second},
|
{1001001001, 6, 10, time.Second, time.Second},
|
||||||
{2002002002, 5, 15, time.Second, time.Second / 2},
|
{2002002002, 5, 15, time.Second, time.Second / 2},
|
||||||
}, time.Second*20, true, true, clk, counter)
|
}, time.Second*20, true, true, true, true, "", clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
@ -220,13 +286,24 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
|
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
|
||||||
{2002002002, 4, 15, time.Second, 750 * time.Millisecond},
|
{2002002002, 4, 15, time.Second, 750 * time.Millisecond},
|
||||||
}, time.Second*13, false, false, clk, counter)
|
}, time.Second*13, false, false, false, true, "concurrency-limit", clk, counter)
|
||||||
|
err = metrics.GatherAndCompare(`
|
||||||
|
# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
|
||||||
|
# TYPE apiserver_flowcontrol_rejected_requests_total counter
|
||||||
|
apiserver_flowcontrol_rejected_requests_total{flowSchema="client0",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 2
|
||||||
|
apiserver_flowcontrol_rejected_requests_total{flowSchema="client1",priorityLevel="TestDifferentFlowsWithoutQueuing",reason="concurrency-limit"} 4
|
||||||
|
`,
|
||||||
|
"apiserver_flowcontrol_rejected_requests_total")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
func TestTimeout(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
@ -244,17 +321,19 @@ func TestTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, qCfg.Name, qs, []uniformClient{
|
||||||
{1001001001, 5, 100, time.Second, time.Second},
|
{1001001001, 5, 100, time.Second, time.Second},
|
||||||
}, time.Second*10, true, false, clk, counter)
|
}, time.Second*10, true, false, true, true, "time-out", clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestContextCancel(t *testing.T) {
|
func TestContextCancel(t *testing.T) {
|
||||||
|
metrics.Register()
|
||||||
|
metrics.Reset()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
qsf := NewQueueSetFactory(clk, counter)
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
qCfg := fq.QueuingConfig{
|
qCfg := fq.QueuingConfig{
|
||||||
Name: "TestTimeout",
|
Name: "TestContextCancel",
|
||||||
DesiredNumQueues: 11,
|
DesiredNumQueues: 11,
|
||||||
QueueLengthLimit: 11,
|
QueueLengthLimit: 11,
|
||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
@ -267,7 +346,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||||
counter.Add(1) // account for the goroutine running this test
|
counter.Add(1) // account for the goroutine running this test
|
||||||
ctx1 := context.Background()
|
ctx1 := context.Background()
|
||||||
req1, _ := qs.StartRequest(ctx1, 1, "test", "one")
|
req1, _ := qs.StartRequest(ctx1, 1, "fs1", "test", "one")
|
||||||
if req1 == nil {
|
if req1 == nil {
|
||||||
t.Error("Request rejected")
|
t.Error("Request rejected")
|
||||||
return
|
return
|
||||||
@ -283,7 +362,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
counter.Add(1)
|
counter.Add(1)
|
||||||
cancel2()
|
cancel2()
|
||||||
}()
|
}()
|
||||||
req2, idle2a := qs.StartRequest(ctx2, 2, "test", "two")
|
req2, idle2a := qs.StartRequest(ctx2, 2, "fs2", "test", "two")
|
||||||
if idle2a {
|
if idle2a {
|
||||||
t.Error("2nd StartRequest returned idle")
|
t.Error("2nd StartRequest returned idle")
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,9 @@ import (
|
|||||||
// request is a temporary container for "requests" with additional
|
// request is a temporary container for "requests" with additional
|
||||||
// tracking fields required for the functionality FQScheduler
|
// tracking fields required for the functionality FQScheduler
|
||||||
type request struct {
|
type request struct {
|
||||||
qs *queueSet
|
qs *queueSet
|
||||||
ctx context.Context
|
fsName string
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
// The relevant queue. Is nil if this request did not go through
|
// The relevant queue. Is nil if this request did not go through
|
||||||
// a queue.
|
// a queue.
|
||||||
|
@ -53,7 +53,7 @@ func (noRestraint) IsIdle() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (fq.Request, bool) {
|
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||||
return noRestraintRequest{}, false
|
return noRestraintRequest{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
|
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
|
||||||
|
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -17,11 +17,13 @@ limitations under the License.
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
compbasemetrics "k8s.io/component-base/metrics"
|
compbasemetrics "k8s.io/component-base/metrics"
|
||||||
"k8s.io/component-base/metrics/legacyregistry"
|
"k8s.io/component-base/metrics/legacyregistry"
|
||||||
|
basemetricstestutil "k8s.io/component-base/metrics/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -50,41 +52,67 @@ func Register() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type resettable interface {
|
||||||
|
Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset all metrics to zero
|
||||||
|
func Reset() {
|
||||||
|
for _, metric := range metrics {
|
||||||
|
rm := metric.(resettable)
|
||||||
|
rm.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GatherAndCompare the given metrics with the given Prometheus syntax expected value
|
||||||
|
func GatherAndCompare(expected string, metricNames ...string) error {
|
||||||
|
return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...)
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec(
|
apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec(
|
||||||
&compbasemetrics.CounterOpts{
|
&compbasemetrics.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "rejected_requests_total",
|
Name: "rejected_requests_total",
|
||||||
Help: "Number of rejected requests by api priority and fairness system",
|
Help: "Number of requests rejected by API Priority and Fairness system",
|
||||||
},
|
},
|
||||||
[]string{priorityLevel, "reason"},
|
[]string{priorityLevel, flowSchema, "reason"},
|
||||||
|
)
|
||||||
|
apiserverDispatchedRequestsTotal = compbasemetrics.NewCounterVec(
|
||||||
|
&compbasemetrics.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "dispatched_requests_total",
|
||||||
|
Help: "Number of requests released by API Priority and Fairness system for service",
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
|
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "current_inqueue_requests",
|
Name: "current_inqueue_requests",
|
||||||
Help: "Number of requests currently pending in the queue by the api priority and fairness system",
|
Help: "Number of requests currently pending in queues of the API Priority and Fairness system",
|
||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
apiserverRequestQueueLength = compbasemetrics.NewHistogramVec(
|
apiserverRequestQueueLength = compbasemetrics.NewHistogramVec(
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "request_queue_length",
|
Name: "request_queue_length_after_enqueue",
|
||||||
Help: "Length of queue in the api priority and fairness system",
|
Help: "Length of queue in the API Priority and Fairness system, as seen by each request after it is enqueued",
|
||||||
Buckets: queueLengthBuckets,
|
Buckets: queueLengthBuckets,
|
||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
apiserverRequestConcurrencyLimit = compbasemetrics.NewGaugeVec(
|
apiserverRequestConcurrencyLimit = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "request_concurrency_limit",
|
Name: "request_concurrency_limit",
|
||||||
Help: "Shared concurrency limit in the api priority and fairness system",
|
Help: "Shared concurrency limit in the API Priority and Fairness system",
|
||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel},
|
||||||
)
|
)
|
||||||
@ -93,9 +121,9 @@ var (
|
|||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "current_executing_requests",
|
Name: "current_executing_requests",
|
||||||
Help: "Number of requests currently executing in the api priority and fairness system",
|
Help: "Number of requests currently executing in the API Priority and Fairness system",
|
||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec(
|
apiserverRequestWaitingSeconds = compbasemetrics.NewHistogramVec(
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
@ -112,13 +140,14 @@ var (
|
|||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "request_execution_seconds",
|
Name: "request_execution_seconds",
|
||||||
Help: "Time of request executing in the api priority and fairness system",
|
Help: "Duration of request execution in the API Priority and Fairness system",
|
||||||
Buckets: requestDurationSecondsBuckets,
|
Buckets: requestDurationSecondsBuckets,
|
||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
metrics = []compbasemetrics.Registerable{
|
metrics = []compbasemetrics.Registerable{
|
||||||
apiserverRejectedRequestsTotal,
|
apiserverRejectedRequestsTotal,
|
||||||
|
apiserverDispatchedRequestsTotal,
|
||||||
apiserverCurrentInqueueRequests,
|
apiserverCurrentInqueueRequests,
|
||||||
apiserverRequestQueueLength,
|
apiserverRequestQueueLength,
|
||||||
apiserverRequestConcurrencyLimit,
|
apiserverRequestConcurrencyLimit,
|
||||||
@ -128,14 +157,14 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control
|
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
|
||||||
func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) {
|
func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) {
|
||||||
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue))
|
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control
|
// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
|
||||||
func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) {
|
func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) {
|
||||||
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing))
|
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
|
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
|
||||||
@ -144,13 +173,18 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddReject increments the # of rejected requests for flow control
|
// AddReject increments the # of rejected requests for flow control
|
||||||
func AddReject(priorityLevel string, reason string) {
|
func AddReject(priorityLevel, flowSchema, reason string) {
|
||||||
apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, reason).Add(1)
|
apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddDispatch increments the # of dispatched requests for flow control
|
||||||
|
func AddDispatch(priorityLevel, flowSchema string) {
|
||||||
|
apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveQueueLength observes the queue length for flow control
|
// ObserveQueueLength observes the queue length for flow control
|
||||||
func ObserveQueueLength(priorityLevel string, length int) {
|
func ObserveQueueLength(priorityLevel, flowSchema string, length int) {
|
||||||
apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length))
|
apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveWaitingDuration observes the queue length for flow control
|
// ObserveWaitingDuration observes the queue length for flow control
|
||||||
|
Loading…
Reference in New Issue
Block a user