mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Add sample-and-watermark for seats occupied during all of execution
This commit is contained in:
parent
76694983a1
commit
945f960cfb
@ -100,10 +100,11 @@ type RequestDigest struct {
|
||||
// this type and cfgMeal follow the convention that the suffix
|
||||
// "Locked" means that the caller must hold the configController lock.
|
||||
type configController struct {
|
||||
name string // varies in tests of fighting controllers
|
||||
clock clock.PassiveClock
|
||||
queueSetFactory fq.QueueSetFactory
|
||||
obsPairGenerator metrics.TimedObserverPairGenerator
|
||||
name string // varies in tests of fighting controllers
|
||||
clock clock.PassiveClock
|
||||
queueSetFactory fq.QueueSetFactory
|
||||
reqsObsPairGenerator metrics.TimedObserverPairGenerator
|
||||
execSeatsObsGenerator metrics.TimedObserverGenerator
|
||||
|
||||
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
|
||||
asFieldManager string
|
||||
@ -192,8 +193,11 @@ type priorityLevelState struct {
|
||||
// returned StartFunction
|
||||
numPending int
|
||||
|
||||
// Observers tracking number waiting, executing
|
||||
obsPair metrics.TimedObserverPair
|
||||
// Observers tracking number of requests waiting, executing
|
||||
reqsObsPair metrics.TimedObserverPair
|
||||
|
||||
// Observer of number of seats occupied throughout execution
|
||||
execSeatsObs metrics.TimedObserver
|
||||
}
|
||||
|
||||
// NewTestableController is extra flexible to facilitate testing
|
||||
@ -202,7 +206,8 @@ func newTestableController(config TestableConfig) *configController {
|
||||
name: config.Name,
|
||||
clock: config.Clock,
|
||||
queueSetFactory: config.QueueSetFactory,
|
||||
obsPairGenerator: config.ObsPairGenerator,
|
||||
reqsObsPairGenerator: config.ReqsObsPairGenerator,
|
||||
execSeatsObsGenerator: config.ExecSeatsObsGenerator,
|
||||
asFieldManager: config.AsFieldManager,
|
||||
foundToDangling: config.FoundToDangling,
|
||||
serverConcurrencyLimit: config.ServerConcurrencyLimit,
|
||||
@ -534,9 +539,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
|
||||
for _, pl := range newPLs {
|
||||
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
|
||||
if state == nil {
|
||||
state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
|
||||
labelValues := []string{pl.Name}
|
||||
state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)}
|
||||
}
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs)
|
||||
if err != nil {
|
||||
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
||||
continue
|
||||
@ -639,7 +645,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
||||
}
|
||||
}
|
||||
var err error
|
||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
|
||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs)
|
||||
if err != nil {
|
||||
// This can not happen because queueSetCompleterForPL already approved this config
|
||||
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
||||
@ -688,7 +694,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
||||
// given priority level configuration. Returns nil if that config
|
||||
// does not call for limiting. Returns nil and an error if the given
|
||||
// object is malformed in a way that is a problem for this package.
|
||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||
return nil, errors.New("broken union structure at the top")
|
||||
}
|
||||
@ -717,7 +723,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
|
||||
if queues != nil {
|
||||
qsc, err = queues.BeginConfigChange(qcQS)
|
||||
} else {
|
||||
qsc, err = qsf.BeginConstruction(qcQS, intPair)
|
||||
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs)
|
||||
}
|
||||
if err != nil {
|
||||
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
|
||||
@ -762,17 +768,20 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
|
||||
// that does not actually exist (right now) as a real API object.
|
||||
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
||||
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
||||
obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
|
||||
labelValues := []string{proto.Name}
|
||||
reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues)
|
||||
execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs)
|
||||
if err != nil {
|
||||
// This can not happen because proto is one of the mandatory
|
||||
// objects and these are not erroneous
|
||||
panic(err)
|
||||
}
|
||||
meal.newPLStates[proto.Name] = &priorityLevelState{
|
||||
pl: proto,
|
||||
qsCompleter: qsCompleter,
|
||||
obsPair: obsPair,
|
||||
pl: proto,
|
||||
qsCompleter: qsCompleter,
|
||||
reqsObsPair: reqsObsPair,
|
||||
execSeatsObs: execSeatsObs,
|
||||
}
|
||||
if proto.Spec.Limited != nil {
|
||||
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
|
||||
|
@ -92,7 +92,8 @@ func New(
|
||||
FlowcontrolClient: flowcontrolClient,
|
||||
ServerConcurrencyLimit: serverConcurrencyLimit,
|
||||
RequestWaitLimit: requestWaitLimit,
|
||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||
})
|
||||
}
|
||||
@ -131,8 +132,11 @@ type TestableConfig struct {
|
||||
// RequestWaitLimit configured on the server
|
||||
RequestWaitLimit time.Duration
|
||||
|
||||
// ObsPairGenerator for metrics
|
||||
ObsPairGenerator metrics.TimedObserverPairGenerator
|
||||
// ObsPairGenerator for metrics about requests
|
||||
ReqsObsPairGenerator metrics.TimedObserverPairGenerator
|
||||
|
||||
// TimedObserverPairGenerator for metrics about seats occupied by all phases of execution
|
||||
ExecSeatsObsGenerator metrics.TimedObserverGenerator
|
||||
|
||||
// QueueSetFactory for the queuing implementation
|
||||
QueueSetFactory fq.QueueSetFactory
|
||||
|
@ -105,7 +105,7 @@ type ctlrTestRequest struct {
|
||||
descr1, descr2 interface{}
|
||||
}
|
||||
|
||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
||||
}
|
||||
|
||||
@ -261,7 +261,8 @@ func TestConfigConsumer(t *testing.T) {
|
||||
FlowcontrolClient: flowcontrolClient,
|
||||
ServerConcurrencyLimit: 100, // server concurrency limit
|
||||
RequestWaitLimit: time.Minute, // request wait limit
|
||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||
QueueSetFactory: cts,
|
||||
})
|
||||
cts.cfgCtlr = ctlr
|
||||
@ -392,7 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
|
||||
FlowcontrolClient: flowcontrolClient,
|
||||
ServerConcurrencyLimit: 100,
|
||||
RequestWaitLimit: time.Minute,
|
||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||
QueueSetFactory: cts,
|
||||
})
|
||||
|
||||
|
@ -31,8 +31,11 @@ import (
|
||||
// are separated so that errors from the first phase can be found
|
||||
// before committing to a concurrency allotment for the second.
|
||||
type QueueSetFactory interface {
|
||||
// BeginConstruction does the first phase of creating a QueueSet
|
||||
BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)
|
||||
// BeginConstruction does the first phase of creating a QueueSet.
|
||||
// The TimedObserverPair observes number of requests,
|
||||
// execution covering just the regular phase.
|
||||
// The TimedObserver observes number of seats occupied through all phases of execution.
|
||||
BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error)
|
||||
}
|
||||
|
||||
// QueueSetCompleter finishes the two-step process of creating or
|
||||
|
@ -60,11 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
|
||||
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
|
||||
// the fields `factory` and `theSet` is non-nil.
|
||||
type queueSetCompleter struct {
|
||||
factory *queueSetFactory
|
||||
obsPair metrics.TimedObserverPair
|
||||
theSet *queueSet
|
||||
qCfg fq.QueuingConfig
|
||||
dealer *shufflesharding.Dealer
|
||||
factory *queueSetFactory
|
||||
reqsObsPair metrics.TimedObserverPair
|
||||
execSeatsObs metrics.TimedObserver
|
||||
theSet *queueSet
|
||||
qCfg fq.QueuingConfig
|
||||
dealer *shufflesharding.Dealer
|
||||
}
|
||||
|
||||
// queueSet implements the Fair Queuing for Server Requests technique
|
||||
@ -79,7 +80,10 @@ type queueSetCompleter struct {
|
||||
type queueSet struct {
|
||||
clock eventclock.Interface
|
||||
estimatedServiceDuration time.Duration
|
||||
obsPair metrics.TimedObserverPair
|
||||
|
||||
reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only
|
||||
|
||||
execSeatsObs metrics.TimedObserver // for all phases of execution
|
||||
|
||||
promiseFactory promiseFactory
|
||||
|
||||
@ -144,16 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
|
||||
}
|
||||
}
|
||||
|
||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||
dealer, err := checkConfig(qCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &queueSetCompleter{
|
||||
factory: qsf,
|
||||
obsPair: obsPair,
|
||||
qCfg: qCfg,
|
||||
dealer: dealer}, nil
|
||||
factory: qsf,
|
||||
reqsObsPair: reqsObsPair,
|
||||
execSeatsObs: execSeatsObs,
|
||||
qCfg: qCfg,
|
||||
dealer: dealer}, nil
|
||||
}
|
||||
|
||||
// checkConfig returns a non-nil Dealer if the config is valid and
|
||||
@ -176,7 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
||||
qs = &queueSet{
|
||||
clock: qsc.factory.clock,
|
||||
estimatedServiceDuration: 3 * time.Millisecond,
|
||||
obsPair: qsc.obsPair,
|
||||
reqsObsPair: qsc.reqsObsPair,
|
||||
execSeatsObs: qsc.execSeatsObs,
|
||||
qCfg: qsc.qCfg,
|
||||
currentR: 0,
|
||||
lastRealTime: qsc.factory.clock.Now(),
|
||||
@ -237,8 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
|
||||
if qll < 1 {
|
||||
qll = 1
|
||||
}
|
||||
qs.obsPair.RequestsWaiting.SetX1(float64(qll))
|
||||
qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||
qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll))
|
||||
qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||
qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||
|
||||
qs.dispatchAsMuchAsPossibleLocked()
|
||||
}
|
||||
@ -391,7 +398,7 @@ func (req *request) wait() (bool, bool) {
|
||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||
req.NoteQueued(false)
|
||||
qs.obsPair.RequestsWaiting.Add(-1)
|
||||
qs.reqsObsPair.RequestsWaiting.Add(-1)
|
||||
}
|
||||
return false, qs.isIdleLocked()
|
||||
}
|
||||
@ -602,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
||||
// remove timed out requests from queue
|
||||
if timeoutCount > 0 {
|
||||
qs.totRequestsWaiting -= timeoutCount
|
||||
qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
||||
qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
||||
}
|
||||
}
|
||||
|
||||
@ -638,7 +645,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
||||
qs.totRequestsWaiting++
|
||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||
request.NoteQueued(true)
|
||||
qs.obsPair.RequestsWaiting.Add(1)
|
||||
qs.reqsObsPair.RequestsWaiting.Add(1)
|
||||
}
|
||||
|
||||
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
|
||||
@ -667,7 +674,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
||||
qs.totSeatsInUse += req.MaxSeats()
|
||||
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
|
||||
qs.obsPair.RequestsExecuting.Add(1)
|
||||
qs.reqsObsPair.RequestsExecuting.Add(1)
|
||||
qs.execSeatsObs.Add(float64(req.MaxSeats()))
|
||||
if klog.V(5).Enabled() {
|
||||
klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||
}
|
||||
@ -690,7 +698,7 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||
qs.totRequestsWaiting--
|
||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||
request.NoteQueued(false)
|
||||
qs.obsPair.RequestsWaiting.Add(-1)
|
||||
qs.reqsObsPair.RequestsWaiting.Add(-1)
|
||||
defer qs.boundNextDispatchLocked(queue)
|
||||
if !request.decision.Set(decisionExecute) {
|
||||
return true
|
||||
@ -707,7 +715,8 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||
queue.seatsInUse += request.MaxSeats()
|
||||
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
|
||||
qs.obsPair.RequestsExecuting.Add(1)
|
||||
qs.reqsObsPair.RequestsExecuting.Add(1)
|
||||
qs.execSeatsObs.Add(float64(request.MaxSeats()))
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
||||
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
|
||||
@ -848,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
now := qs.clock.Now()
|
||||
qs.totRequestsExecuting--
|
||||
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||
qs.obsPair.RequestsExecuting.Add(-1)
|
||||
qs.reqsObsPair.RequestsExecuting.Add(-1)
|
||||
|
||||
actualServiceDuration := now.Sub(r.startTime)
|
||||
|
||||
@ -860,6 +869,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
|
||||
qs.totSeatsInUse -= r.MaxSeats()
|
||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
|
||||
qs.execSeatsObs.Add(-float64(r.MaxSeats()))
|
||||
if r.queue != nil {
|
||||
r.queue.seatsInUse -= r.MaxSeats()
|
||||
}
|
||||
@ -973,8 +983,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
|
||||
}
|
||||
|
||||
func (qs *queueSet) UpdateObservations() {
|
||||
qs.obsPair.RequestsWaiting.Add(0)
|
||||
qs.obsPair.RequestsExecuting.Add(0)
|
||||
qs.reqsObsPair.RequestsWaiting.Add(0)
|
||||
qs.reqsObsPair.RequestsExecuting.Add(0)
|
||||
qs.execSeatsObs.Add(0)
|
||||
}
|
||||
|
||||
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
||||
|
@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
now := time.Now()
|
||||
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) {
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) {
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 40 * Quarter,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) {
|
||||
HandSize: 7,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) {
|
||||
HandSize: 7,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -874,7 +874,7 @@ func TestWindup(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
||||
Name: "TestDifferentFlowsWithoutQueuing",
|
||||
DesiredNumQueues: 0,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 0,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 15 * time.Second,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
||||
DesiredNumQueues: 0,
|
||||
RequestWaitLimit: 15 * time.Second,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1357,7 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) {
|
||||
qs := &queueSet{
|
||||
clock: clk,
|
||||
estimatedServiceDuration: time.Second,
|
||||
obsPair: newObserverPair(clk),
|
||||
reqsObsPair: newObserverPair(clk),
|
||||
execSeatsObs: newExecSeatsObserver(clk),
|
||||
}
|
||||
queue := &queue{
|
||||
requests: newRequestFIFO(),
|
||||
@ -1463,3 +1464,7 @@ func newFIFO(requests ...*request) fifo {
|
||||
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
|
||||
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
|
||||
}
|
||||
|
||||
func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver {
|
||||
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"})
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ type noRestraint struct{}
|
||||
|
||||
type noRestraintRequest struct{}
|
||||
|
||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||
return noRestraintCompleter{}, nil
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,8 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
|
||||
HandSize: hs,
|
||||
QueueLengthLimit: 5}
|
||||
}
|
||||
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}))
|
||||
labelVals := []string{"test"}
|
||||
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, labelVals))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -104,6 +104,28 @@ var (
|
||||
},
|
||||
[]string{priorityLevel, flowSchema},
|
||||
)
|
||||
// PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels
|
||||
PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond,
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "priority_level_seat_count_samples",
|
||||
Help: "Periodic observations of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
ConstLabels: map[string]string{phase: "executing"},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "priority_level_seat_count_watermarks",
|
||||
Help: "Watermarks of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
ConstLabels: map[string]string{phase: "executing"},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{priorityLevel},
|
||||
)
|
||||
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
||||
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||
&compbasemetrics.HistogramOpts{
|
||||
@ -295,6 +317,7 @@ var (
|
||||
apiserverRequestExecutionSeconds,
|
||||
apiserverEpochAdvances,
|
||||
}.
|
||||
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...).
|
||||
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
||||
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
||||
)
|
||||
|
@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct {
|
||||
waterMarks *compbasemetrics.HistogramVec
|
||||
}
|
||||
|
||||
var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil)
|
||||
var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{}
|
||||
|
||||
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
|
||||
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
|
||||
|
Loading…
Reference in New Issue
Block a user