mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Keep the progress meter R from overflowing
Also add test for that situation.
This commit is contained in:
parent
b28bf04cd0
commit
a797fbd96d
@ -182,7 +182,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
|||||||
}
|
}
|
||||||
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
|
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
|
||||||
}
|
}
|
||||||
qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg)
|
qs.setConfiguration(context.Background(), qsc.qCfg, qsc.dealer, dCfg)
|
||||||
return qs
|
return qs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,8 +210,8 @@ func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetComplet
|
|||||||
// Update handling for when fields are updated is handled here as well -
|
// Update handling for when fields are updated is handled here as well -
|
||||||
// eg: if DesiredNum is increased, SetConfiguration reconciles by
|
// eg: if DesiredNum is increased, SetConfiguration reconciles by
|
||||||
// adding more queues.
|
// adding more queues.
|
||||||
func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
|
func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime(ctx)
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
|
|
||||||
if qCfg.DesiredNumQueues > 0 {
|
if qCfg.DesiredNumQueues > 0 {
|
||||||
@ -260,7 +260,7 @@ const (
|
|||||||
// The queueSet's promiseFactory is invoked once if the returns Request is non-nil,
|
// The queueSet's promiseFactory is invoked once if the returns Request is non-nil,
|
||||||
// not invoked if the Request is nil.
|
// not invoked if the Request is nil.
|
||||||
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime(ctx)
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var req *request
|
var req *request
|
||||||
|
|
||||||
@ -350,7 +350,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
// The final step is to wait on a decision from
|
// The final step is to wait on a decision from
|
||||||
// somewhere and then act on it.
|
// somewhere and then act on it.
|
||||||
decisionAny := req.decision.Get()
|
decisionAny := req.decision.Get()
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime(req.ctx)
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
if req.waitStarted {
|
if req.waitStarted {
|
||||||
// This can not happen, because the client is forbidden to
|
// This can not happen, because the client is forbidden to
|
||||||
@ -396,27 +396,74 @@ func (qs *queueSet) isIdleLocked() bool {
|
|||||||
// lockAndSyncTime acquires the lock and updates the virtual time.
|
// lockAndSyncTime acquires the lock and updates the virtual time.
|
||||||
// Doing them together avoids the mistake of modify some queue state
|
// Doing them together avoids the mistake of modify some queue state
|
||||||
// before calling syncTimeLocked.
|
// before calling syncTimeLocked.
|
||||||
func (qs *queueSet) lockAndSyncTime() {
|
func (qs *queueSet) lockAndSyncTime(ctx context.Context) {
|
||||||
qs.lock.Lock()
|
qs.lock.Lock()
|
||||||
qs.syncTimeLocked()
|
qs.syncTimeLocked(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncTimeLocked updates the virtual time based on the assumption
|
// syncTimeLocked updates the virtual time based on the assumption
|
||||||
// that the current state of the queues has been in effect since
|
// that the current state of the queues has been in effect since
|
||||||
// `qs.lastRealTime`. Thus, it should be invoked after acquiring the
|
// `qs.lastRealTime`. Thus, it should be invoked after acquiring the
|
||||||
// lock and before modifying the state of any queue.
|
// lock and before modifying the state of any queue.
|
||||||
func (qs *queueSet) syncTimeLocked() {
|
func (qs *queueSet) syncTimeLocked(ctx context.Context) {
|
||||||
realNow := qs.clock.Now()
|
realNow := qs.clock.Now()
|
||||||
timeSinceLast := realNow.Sub(qs.lastRealTime)
|
timeSinceLast := realNow.Sub(qs.lastRealTime)
|
||||||
qs.lastRealTime = realNow
|
qs.lastRealTime = realNow
|
||||||
prevR := qs.currentR
|
prevR := qs.currentR
|
||||||
qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
|
incrR := SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
|
||||||
if qs.currentR < prevR {
|
qs.currentR = prevR + incrR
|
||||||
klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR)
|
switch {
|
||||||
|
case prevR > qs.currentR:
|
||||||
|
klog.ErrorS(errors.New("queueset::currentR overflow"), "Overflow", "QS", qs.qCfg.Name, "when", realNow.Format(nsTimeFmt), "prevR", prevR, "incrR", incrR, "currentR", qs.currentR)
|
||||||
|
case qs.currentR >= highR:
|
||||||
|
qs.advanceEpoch(ctx, realNow, incrR)
|
||||||
}
|
}
|
||||||
metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat())
|
metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rDecrement is the amount by which the progress meter R is wound backwards
|
||||||
|
// when needed to avoid overflow.
|
||||||
|
const rDecrement = MaxSeatSeconds / 2
|
||||||
|
|
||||||
|
// highR is the threshold that triggers advance of the epoch.
|
||||||
|
// That is, decrementing the global progress meter R by rDecrement.
|
||||||
|
const highR = rDecrement + rDecrement/2
|
||||||
|
|
||||||
|
// advanceEpoch subtracts rDecrement from the global progress meter R
|
||||||
|
// and all the readings that have been taked from that meter.
|
||||||
|
// The now and incrR parameters are only used to add info to the log messages.
|
||||||
|
func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR SeatSeconds) {
|
||||||
|
oldR := qs.currentR
|
||||||
|
qs.currentR -= rDecrement
|
||||||
|
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
|
||||||
|
success := true
|
||||||
|
for qIdx, queue := range qs.queues {
|
||||||
|
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
|
||||||
|
// Do not just decrement, the value could be quite outdated.
|
||||||
|
// It is safe to reset to zero in this case, because the next request
|
||||||
|
// will overwrite the zero with `qs.currentR`.
|
||||||
|
queue.nextDispatchR = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
oldNextDispatchR := queue.nextDispatchR
|
||||||
|
queue.nextDispatchR -= rDecrement
|
||||||
|
if queue.nextDispatchR > oldNextDispatchR {
|
||||||
|
klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR)
|
||||||
|
success = false
|
||||||
|
}
|
||||||
|
queue.requests.Walk(func(req *request) bool {
|
||||||
|
oldArrivalR := req.arrivalR
|
||||||
|
req.arrivalR -= rDecrement
|
||||||
|
if req.arrivalR > oldArrivalR {
|
||||||
|
klog.ErrorS(errors.New("request::arrivalR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "request", *req, "oldArrivalR", oldArrivalR, "incrR", incrR)
|
||||||
|
success = false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
metrics.AddEpochAdvance(ctx, qs.qCfg.Name, success)
|
||||||
|
}
|
||||||
|
|
||||||
// getVirtualTimeRatio calculates the rate at which virtual time has
|
// getVirtualTimeRatio calculates the rate at which virtual time has
|
||||||
// been advancing, according to the logic in `doc.go`.
|
// been advancing, according to the logic in `doc.go`.
|
||||||
func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
||||||
@ -774,7 +821,7 @@ func ssMax(a, b SeatSeconds) SeatSeconds {
|
|||||||
// once a request finishes execution or is canceled. This returns a bool
|
// once a request finishes execution or is canceled. This returns a bool
|
||||||
// indicating whether the QueueSet is now idle.
|
// indicating whether the QueueSet is now idle.
|
||||||
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool {
|
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime(req.ctx)
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
|
|
||||||
qs.finishRequestLocked(req)
|
qs.finishRequestLocked(req)
|
||||||
@ -835,7 +882,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
// AdditionalLatency elapses, this ensures that the additional
|
// AdditionalLatency elapses, this ensures that the additional
|
||||||
// latency has no impact on the user experience.
|
// latency has no impact on the user experience.
|
||||||
qs.clock.EventAfterDuration(func(_ time.Time) {
|
qs.clock.EventAfterDuration(func(_ time.Time) {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime(r.ctx)
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
releaseSeatsLocked()
|
releaseSeatsLocked()
|
||||||
|
@ -179,6 +179,7 @@ type uniformScenario struct {
|
|||||||
clk *testeventclock.Fake
|
clk *testeventclock.Fake
|
||||||
counter counter.GoRoutineCounter
|
counter counter.GoRoutineCounter
|
||||||
padConstrains bool
|
padConstrains bool
|
||||||
|
expectedEpochAdvances int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (us uniformScenario) exercise(t *testing.T) {
|
func (us uniformScenario) exercise(t *testing.T) {
|
||||||
@ -405,6 +406,18 @@ func (uss *uniformScenarioState) finalReview() {
|
|||||||
uss.t.Log("Success with" + e)
|
uss.t.Log("Success with" + e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
e := ""
|
||||||
|
if uss.expectedEpochAdvances > 0 {
|
||||||
|
e = fmt.Sprintf(` # HELP apiserver_flowcontrol_epoch_advance_total [ALPHA] Number of times the queueset's progress meter jumped backward
|
||||||
|
# TYPE apiserver_flowcontrol_epoch_advance_total counter
|
||||||
|
apiserver_flowcontrol_epoch_advance_total{priority_level=%q,success=%q} %d%s`, uss.name, "true", uss.expectedEpochAdvances, "\n")
|
||||||
|
}
|
||||||
|
err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_epoch_advance_total")
|
||||||
|
if err != nil {
|
||||||
|
uss.t.Error(err)
|
||||||
|
} else {
|
||||||
|
uss.t.Logf("Success with apiserver_flowcontrol_epoch_advance_total = %d", uss.expectedEpochAdvances)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
@ -647,7 +660,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSeatSecondsRollover demonstrates that SeatSeconds overflow can cause bad stuff to happen.
|
// TestSeatSecondsRollover checks that there is not a problem with SeatSecons overflow.
|
||||||
func TestSeatSecondsRollover(t *testing.T) {
|
func TestSeatSecondsRollover(t *testing.T) {
|
||||||
metrics.Register()
|
metrics.Register()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -677,13 +690,14 @@ func TestSeatSecondsRollover(t *testing.T) {
|
|||||||
},
|
},
|
||||||
concurrencyLimit: 2000,
|
concurrencyLimit: 2000,
|
||||||
evalDuration: Quarter * 40,
|
evalDuration: Quarter * 40,
|
||||||
expectedFair: []bool{false},
|
expectedFair: []bool{true},
|
||||||
expectedFairnessMargin: []float64{0.01},
|
expectedFairnessMargin: []float64{0.01},
|
||||||
expectAllRequests: true,
|
expectAllRequests: true,
|
||||||
evalInqueueMetrics: true,
|
evalInqueueMetrics: true,
|
||||||
evalExecutingMetrics: true,
|
evalExecutingMetrics: true,
|
||||||
clk: clk,
|
clk: clk,
|
||||||
counter: counter,
|
counter: counter,
|
||||||
|
expectedEpochAdvances: 8,
|
||||||
}.exercise(t)
|
}.exercise(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,29 +17,33 @@ limitations under the License.
|
|||||||
package queueset
|
package queueset
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"math"
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestSeatSecondsString exercises the SeatSeconds constructor and de-constructors (String, ToFloat).
|
||||||
func TestSeatSecondsString(t *testing.T) {
|
func TestSeatSecondsString(t *testing.T) {
|
||||||
digits := math.Log10(ssScale)
|
|
||||||
expectFmt := fmt.Sprintf("%%%d.%dfss", int(digits+2), int(digits))
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
ss SeatSeconds
|
ss SeatSeconds
|
||||||
expect string
|
expectFloat float64
|
||||||
|
expectStr string
|
||||||
}{
|
}{
|
||||||
{ss: SeatSeconds(1), expect: fmt.Sprintf(expectFmt, 1.0/ssScale)},
|
{ss: SeatSeconds(1), expectFloat: 1.0 / ssScale, expectStr: "0.00000001ss"},
|
||||||
{ss: 0, expect: "0.00000000ss"},
|
{ss: SeatSeconds(ssScale - 1), expectFloat: (ssScale - 1) / ssScale, expectStr: "0.99999999ss"},
|
||||||
{ss: SeatsTimesDuration(1, time.Second), expect: "1.00000000ss"},
|
{ss: 0, expectFloat: 0, expectStr: "0.00000000ss"},
|
||||||
{ss: SeatsTimesDuration(123, 100*time.Millisecond), expect: "12.30000000ss"},
|
{ss: SeatsTimesDuration(1, time.Second), expectFloat: 1, expectStr: "1.00000000ss"},
|
||||||
{ss: SeatsTimesDuration(1203, 10*time.Millisecond), expect: "12.03000000ss"},
|
{ss: SeatsTimesDuration(123, 100*time.Millisecond), expectFloat: 12.3, expectStr: "12.30000000ss"},
|
||||||
|
{ss: SeatsTimesDuration(1203, 10*time.Millisecond), expectFloat: 12.03, expectStr: "12.03000000ss"},
|
||||||
}
|
}
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
actualStr := testCase.ss.String()
|
actualStr := testCase.ss.String()
|
||||||
if actualStr != testCase.expect {
|
if actualStr != testCase.expectStr {
|
||||||
t.Errorf("SeatSeonds(%d) formatted as %q rather than expected %q", uint64(testCase.ss), actualStr, testCase.expect)
|
t.Errorf("SeatSeconds(%d).String() is %q but expected %q", uint64(testCase.ss), actualStr, testCase.expectStr)
|
||||||
|
}
|
||||||
|
actualFloat := testCase.ss.ToFloat()
|
||||||
|
if math.Round(actualFloat*ssScale) != math.Round(testCase.expectFloat*ssScale) {
|
||||||
|
t.Errorf("SeatSeconds(%d).ToFloat() is %v but expected %v", uint64(testCase.ss), actualFloat, testCase.expectFloat)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,6 +62,7 @@ type request struct {
|
|||||||
arrivalTime time.Time
|
arrivalTime time.Time
|
||||||
|
|
||||||
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
|
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
|
||||||
|
// This field is meaningful only while the request is waiting in the virtual world.
|
||||||
arrivalR SeatSeconds
|
arrivalR SeatSeconds
|
||||||
|
|
||||||
// descr1 and descr2 are not used in any logic but they appear in
|
// descr1 and descr2 are not used in any logic but they appear in
|
||||||
|
@ -18,6 +18,7 @@ package metrics
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -103,7 +104,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
|
||||||
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
||||||
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
@ -122,8 +122,8 @@ var (
|
|||||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||||
StabilityLevel: compbasemetrics.ALPHA,
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
},
|
},
|
||||||
[]string{priorityLevel})
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly
|
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly
|
||||||
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
@ -142,8 +142,8 @@ var (
|
|||||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||||
StabilityLevel: compbasemetrics.ALPHA,
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
},
|
},
|
||||||
[]string{requestKind})
|
[]string{requestKind},
|
||||||
|
)
|
||||||
apiserverCurrentR = compbasemetrics.NewGaugeVec(
|
apiserverCurrentR = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -154,7 +154,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel},
|
||||||
)
|
)
|
||||||
|
|
||||||
apiserverDispatchR = compbasemetrics.NewGaugeVec(
|
apiserverDispatchR = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -165,7 +164,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel},
|
||||||
)
|
)
|
||||||
|
|
||||||
apiserverLatestS = compbasemetrics.NewGaugeVec(
|
apiserverLatestS = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -176,7 +174,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel},
|
[]string{priorityLevel},
|
||||||
)
|
)
|
||||||
|
|
||||||
apiserverNextSBounds = compbasemetrics.NewGaugeVec(
|
apiserverNextSBounds = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -187,7 +184,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, "bound"},
|
[]string{priorityLevel, "bound"},
|
||||||
)
|
)
|
||||||
|
|
||||||
apiserverNextDiscountedSBounds = compbasemetrics.NewGaugeVec(
|
apiserverNextDiscountedSBounds = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -198,7 +194,6 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, "bound"},
|
[]string{priorityLevel, "bound"},
|
||||||
)
|
)
|
||||||
|
|
||||||
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
|
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
|
||||||
&compbasemetrics.GaugeOpts{
|
&compbasemetrics.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -272,6 +267,17 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
apiserverEpochAdvances = compbasemetrics.NewCounterVec(
|
||||||
|
&compbasemetrics.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "epoch_advance_total",
|
||||||
|
Help: "Number of times the queueset's progress meter jumped backward",
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, "success"},
|
||||||
|
)
|
||||||
|
|
||||||
metrics = Registerables{
|
metrics = Registerables{
|
||||||
apiserverRejectedRequestsTotal,
|
apiserverRejectedRequestsTotal,
|
||||||
apiserverDispatchedRequestsTotal,
|
apiserverDispatchedRequestsTotal,
|
||||||
@ -287,6 +293,7 @@ var (
|
|||||||
apiserverCurrentExecutingRequests,
|
apiserverCurrentExecutingRequests,
|
||||||
apiserverRequestWaitingSeconds,
|
apiserverRequestWaitingSeconds,
|
||||||
apiserverRequestExecutionSeconds,
|
apiserverRequestExecutionSeconds,
|
||||||
|
apiserverEpochAdvances,
|
||||||
}.
|
}.
|
||||||
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
||||||
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
||||||
@ -352,3 +359,7 @@ func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, exec
|
|||||||
func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) {
|
func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) {
|
||||||
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) {
|
||||||
|
apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user