apf: remove RequestWaitLimit from queueset config

This commit is contained in:
Abu Kashem 2023-08-29 12:11:08 -04:00
parent da8a472206
commit 11ef9514da
No known key found for this signature in database
GPG Key ID: E5ECC1124B5F9C68
14 changed files with 21 additions and 60 deletions

View File

@ -196,7 +196,6 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
versionedInformer, versionedInformer,
extclient.FlowcontrolV1beta3(), extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight, s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil ), nil
} }

View File

@ -687,7 +687,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
headerMatcher := headerMatcher{} headerMatcher := headerMatcher{}
var executed bool var executed bool
@ -757,7 +757,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
headerMatcher := headerMatcher{} headerMatcher := headerMatcher{}
var executed bool var executed bool
@ -833,7 +833,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
headerMatcher := headerMatcher{} headerMatcher := headerMatcher{}
var innerHandlerWriteErr error var innerHandlerWriteErr error
@ -911,7 +911,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0) apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, 0)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
headerMatcher := headerMatcher{} headerMatcher := headerMatcher{}
var innerHandlerWriteErr error var innerHandlerWriteErr error
@ -986,7 +986,7 @@ func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength) apfConfiguration := newConfiguration(fsName, plName, userName, plConcurrencyShares, queueLength)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, requestTimeout/4, plName, plConcurrency) controller, controllerCompletedCh := startAPFController(t, stopCh, apfConfiguration, serverConcurrency, plName, plConcurrency)
headerMatcher := headerMatcher{} headerMatcher := headerMatcher{}
var firstRequestInnerHandlerWriteErr error var firstRequestInnerHandlerWriteErr error
@ -1118,11 +1118,11 @@ func fmtError(err error) string {
} }
func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int, func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration []runtime.Object, serverConcurrency int,
requestWaitLimit time.Duration, plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) { plName string, plConcurrency int) (utilflowcontrol.Interface, <-chan error) {
clientset := newClientset(t, apfConfiguration...) clientset := newClientset(t, apfConfiguration...)
// this test does not rely on resync, so resync period is set to zero // this test does not rely on resync, so resync period is set to zero
factory := informers.NewSharedInformerFactory(clientset, 0) factory := informers.NewSharedInformerFactory(clientset, 0)
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency, requestWaitLimit) controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta3(), serverConcurrency)
factory.Start(stopCh) factory.Start(stopCh)

View File

@ -154,7 +154,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
config.SharedInformerFactory, config.SharedInformerFactory,
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(), kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(),
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
config.RequestTimeout/4,
) )
} else { } else {
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled") klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")

View File

@ -150,9 +150,6 @@ type configController struct {
// from server configuration. // from server configuration.
serverConcurrencyLimit int serverConcurrencyLimit int
// requestWaitLimit comes from server configuration.
requestWaitLimit time.Duration
// watchTracker implements the necessary WatchTracker interface. // watchTracker implements the necessary WatchTracker interface.
WatchTracker WatchTracker
@ -287,13 +284,12 @@ func newTestableController(config TestableConfig) *configController {
asFieldManager: config.AsFieldManager, asFieldManager: config.AsFieldManager,
foundToDangling: config.FoundToDangling, foundToDangling: config.FoundToDangling,
serverConcurrencyLimit: config.ServerConcurrencyLimit, serverConcurrencyLimit: config.ServerConcurrencyLimit,
requestWaitLimit: config.RequestWaitLimit,
flowcontrolClient: config.FlowcontrolClient, flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState), priorityLevelStates: make(map[string]*priorityLevelState),
WatchTracker: NewWatchTracker(), WatchTracker: NewWatchTracker(),
MaxSeatsTracker: NewMaxSeatsTracker(), MaxSeatsTracker: NewMaxSeatsTracker(),
} }
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between // Start with longish delay because conflicts will be between
// different processes, so take some time to go away. // different processes, so take some time to go away.
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
@ -433,7 +429,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
plState := plStates[plName] plState := plStates[plName]
if setCompleters { if setCompleters {
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
if err != nil { if err != nil {
klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl) klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl)
@ -657,10 +653,10 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro
// Supply missing mandatory PriorityLevelConfiguration objects // Supply missing mandatory PriorityLevelConfiguration objects
if !meal.haveExemptPL { if !meal.haveExemptPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit) meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt)
} }
if !meal.haveCatchAllPL { if !meal.haveCatchAllPL {
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit) meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll)
} }
meal.finishQueueSetReconfigsLocked() meal.finishQueueSetReconfigsLocked()
@ -692,7 +688,7 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
} }
} }
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues,
pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, pl, state.reqsGaugePair, state.execSeatsObs,
metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge)) metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
if err != nil { if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
@ -798,7 +794,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
} }
var err error var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues,
plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
if err != nil { if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config // This can not happen because queueSetCompleterForPL already approved this config
@ -880,7 +876,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
// given priority level configuration. Returns nil and an error if the given // given priority level configuration. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package. // object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) { func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) { if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
return nil, errors.New("broken union structure at the top, for Limited") return nil, errors.New("broken union structure at the top, for Limited")
} }
@ -902,7 +898,6 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
DesiredNumQueues: int(qcAPI.Queues), DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit), QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize), HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
} }
} }
} else { } else {
@ -956,16 +951,15 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
// imaginePL adds a priority level based on one of the mandatory ones // imaginePL adds a priority level based on one of the mandatory ones
// that does not actually exist (right now) as a real API object. // that does not actually exist (right now) as a real API object.
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration) {
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
labelValues := []string{proto.Name} labelValues := []string{proto.Name}
reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues) reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name) seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name}) seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, reqsGaugePair,
requestWaitLimit, reqsGaugePair, execSeatsObs, execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
if err != nil { if err != nil {
// This can not happen because proto is one of the mandatory // This can not happen because proto is one of the mandatory
// objects and these are not erroneous // objects and these are not erroneous

View File

@ -90,7 +90,6 @@ func New(
informerFactory kubeinformers.SharedInformerFactory, informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface, flowcontrolClient flowcontrolclient.FlowcontrolV1beta3Interface,
serverConcurrencyLimit int, serverConcurrencyLimit int,
requestWaitLimit time.Duration,
) Interface { ) Interface {
clk := eventclock.Real{} clk := eventclock.Real{}
return NewTestable(TestableConfig{ return NewTestable(TestableConfig{
@ -101,7 +100,6 @@ func New(
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverConcurrencyLimit, ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk), QueueSetFactory: fqs.NewQueueSetFactory(clk),
@ -139,9 +137,6 @@ type TestableConfig struct {
// ServerConcurrencyLimit for the controller to enforce // ServerConcurrencyLimit for the controller to enforce
ServerConcurrencyLimit int ServerConcurrencyLimit int
// RequestWaitLimit configured on the server
RequestWaitLimit time.Duration
// GaugeVec for metrics about requests, broken down by phase and priority_level // GaugeVec for metrics about requests, broken down by phase and priority_level
ReqsGaugeVec metrics.RatioedGaugeVec ReqsGaugeVec metrics.RatioedGaugeVec

View File

@ -109,7 +109,6 @@ func TestQueueWaitTimeLatencyTracker(t *testing.T) {
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 24, ServerConcurrencyLimit: 24,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk), QueueSetFactory: fqs.NewQueueSetFactory(clk),

View File

@ -143,7 +143,6 @@ func TestBorrowing(t *testing.T) {
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 24, ServerConcurrencyLimit: 24,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk), QueueSetFactory: fqs.NewQueueSetFactory(clk),

View File

@ -251,8 +251,7 @@ func TestConfigConsumer(t *testing.T) {
FoundToDangling: func(found bool) bool { return !found }, FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100, // server concurrency limit ServerConcurrencyLimit: 100, // server concurrency limit
RequestWaitLimit: time.Minute, // request wait limit
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts, QueueSetFactory: cts,
@ -384,7 +383,6 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient, FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100, ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: cts, QueueSetFactory: cts,

View File

@ -18,7 +18,6 @@ package fairqueuing
import ( import (
"context" "context"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/debug" "k8s.io/apiserver/pkg/util/flowcontrol/debug"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
@ -117,7 +116,7 @@ type QueuingConfig struct {
// DesiredNumQueues is the number of queues that the API says // DesiredNumQueues is the number of queues that the API says
// should exist now. This may be non-positive, in which case // should exist now. This may be non-positive, in which case
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored. // QueueLengthLimit, and HandSize are ignored.
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig. // A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
// A negative value means to always dispatch immediately upon arrival // A negative value means to always dispatch immediately upon arrival
// (i.e., the requests are "exempt" from limitation). // (i.e., the requests are "exempt" from limitation).
@ -129,10 +128,6 @@ type QueuingConfig struct {
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length. // dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration
} }
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet. // DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.

View File

@ -272,7 +272,6 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
} else { } else {
qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
qCfg.HandSize = qs.qCfg.HandSize qCfg.HandSize = qs.qCfg.HandSize
qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
} }
qs.qCfg = qCfg qs.qCfg = qCfg

View File

@ -551,7 +551,6 @@ func TestBaseline(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 8, QueueLengthLimit: 8,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -590,7 +589,6 @@ func TestExampt(t *testing.T) {
DesiredNumQueues: -1, DesiredNumQueues: -1,
QueueLengthLimit: 2, QueueLengthLimit: 2,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -663,7 +661,6 @@ func TestSeparations(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 8, QueueLengthLimit: 8,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject") seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, caseName+" seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -704,7 +701,6 @@ func TestUniformFlowsHandSize1(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 8, QueueLengthLimit: 8,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject") seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, "seatDemandSubject")
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -743,7 +739,6 @@ func TestUniformFlowsHandSize3(t *testing.T) {
DesiredNumQueues: 8, DesiredNumQueues: 8,
QueueLengthLimit: 16, QueueLengthLimit: 16,
HandSize: 3, HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -781,7 +776,6 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 8, QueueLengthLimit: 8,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -823,7 +817,6 @@ func TestSeatSecondsRollover(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 8, QueueLengthLimit: 8,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 40 * Quarter,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -863,7 +856,6 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 6, QueueLengthLimit: 6,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -902,7 +894,6 @@ func TestDifferentWidths(t *testing.T) {
DesiredNumQueues: 64, DesiredNumQueues: 64,
QueueLengthLimit: 13, QueueLengthLimit: 13,
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -940,7 +931,6 @@ func TestTooWide(t *testing.T) {
DesiredNumQueues: 64, DesiredNumQueues: 64,
QueueLengthLimit: 35, QueueLengthLimit: 35,
HandSize: 7, HandSize: 7,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -1003,7 +993,6 @@ func TestWindup(t *testing.T) {
DesiredNumQueues: 9, DesiredNumQueues: 9,
QueueLengthLimit: 6, QueueLengthLimit: 6,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 10 * time.Minute,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -1093,7 +1082,6 @@ func TestContextCancel(t *testing.T) {
DesiredNumQueues: 11, DesiredNumQueues: 11,
QueueLengthLimit: 11, QueueLengthLimit: 11,
HandSize: 1, HandSize: 1,
RequestWaitLimit: 15 * time.Second,
} }
seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name) seatDemandIntegratorSubject := fq.NewNamedIntegrator(clk, qCfg.Name)
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), seatDemandIntegratorSubject)
@ -1200,7 +1188,6 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
qCfg := fq.QueuingConfig{ qCfg := fq.QueuingConfig{
Name: "TestTotalRequestsExecutingWithPanic", Name: "TestTotalRequestsExecutingWithPanic",
DesiredNumQueues: 0, DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second,
} }
qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name)) qsc, err := qsf.BeginConstruction(qCfg, newGaugePair(clk), newExecSeatsGauge(clk), fq.NewNamedIntegrator(clk, qCfg.Name))
if err != nil { if err != nil {

View File

@ -21,7 +21,6 @@ import (
"math/rand" "math/rand"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
"k8s.io/utils/clock" "k8s.io/utils/clock"
@ -60,7 +59,7 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
QueueLengthLimit: 5} QueueLengthLimit: 5}
} }
labelVals := []string{"test"} labelVals := []string{"test"}
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name)) _, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, metrics.RatioedGaugeVecPhasedElementPair(metrics.PriorityLevelConcurrencyGaugeVec, 1, 1, labelVals), metrics.PriorityLevelExecutionSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelVals), fq.NewNamedIntegrator(clock.RealClock{}, name))
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -108,7 +108,6 @@ func Test_GetMaxSeats(t *testing.T) {
// for the purposes of this test, serverCL ~= nominalCL since there is // for the purposes of this test, serverCL ~= nominalCL since there is
// only 1 PL with large concurrency shares, making mandatory PLs negligible. // only 1 PL with large concurrency shares, making mandatory PLs negligible.
ServerConcurrencyLimit: testcase.nominalCL, ServerConcurrencyLimit: testcase.nominalCL,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk), QueueSetFactory: fqs.NewQueueSetFactory(clk),

View File

@ -139,8 +139,7 @@ func (ft *fightTest) createController(invert bool, i int) {
AsFieldManager: fieldMgr, AsFieldManager: fieldMgr,
InformerFactory: informerFactory, InformerFactory: informerFactory,
FlowcontrolClient: fcIfc, FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqtesting.NewNoRestraintFactory(), QueueSetFactory: fqtesting.NewNoRestraintFactory(),