Define TestableConfig in k/apiserver/pkg/util/flowcontrol

Collect the parameters of newTestableController into a named type.

Also tolerate the surprising situation in which a request's user
groups include neither `system:authenticated` nor
`system:unauthenticated` --- because this is observed to happen in
some tests.

Also a few other minor fixups.
This commit is contained in:
Mike Spreitzer 2020-12-16 18:58:45 -05:00
parent fc43c80ccd
commit 10df6d459b
3 changed files with 64 additions and 60 deletions

View File

@ -42,7 +42,6 @@ import (
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -61,7 +60,7 @@ import (
// undesired becomes completely unused, all the config objects are // undesired becomes completely unused, all the config objects are
// read and processed as a whole. // read and processed as a whole.
// StartFunction begins the process of handlig a request. If the // StartFunction begins the process of handling a request. If the
// request gets queued then this function uses the given hashValue as // request gets queued then this function uses the given hashValue as
// the source of entropy as it shuffle-shards the request into a // the source of entropy as it shuffle-shards the request into a
// queue. The descr1 and descr2 values play no role in the logic but // queue. The descr1 and descr2 values play no role in the logic but
@ -151,34 +150,22 @@ type priorityLevelState struct {
} }
// NewTestableController is extra flexible to facilitate testing // NewTestableController is extra flexible to facilitate testing
func newTestableController( func newTestableController(config TestableConfig) *configController {
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory,
) *configController {
cfgCtlr := &configController{ cfgCtlr := &configController{
queueSetFactory: queueSetFactory, queueSetFactory: config.QueueSetFactory,
obsPairGenerator: obsPairGenerator, obsPairGenerator: config.ObsPairGenerator,
serverConcurrencyLimit: serverConcurrencyLimit, serverConcurrencyLimit: config.ServerConcurrencyLimit,
requestWaitLimit: requestWaitLimit, requestWaitLimit: config.RequestWaitLimit,
flowcontrolClient: flowcontrolClient, flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState), priorityLevelStates: make(map[string]*priorityLevelState),
} }
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit) klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit)
cfgCtlr.initializeConfigController(informerFactory) // Start with longish delay because conflicts will be between
// different processes, so take some time to go away.
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
// ensure the data structure reflects the mandatory config // ensure the data structure reflects the mandatory config
cfgCtlr.lockAndDigestConfigObjects(nil, nil) cfgCtlr.lockAndDigestConfigObjects(nil, nil)
return cfgCtlr fci := config.InformerFactory.Flowcontrol().V1beta1()
}
// initializeConfigController sets up the controller that processes
// config API objects.
func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
fci := informerFactory.Flowcontrol().V1beta1()
pli := fci.PriorityLevelConfigurations() pli := fci.PriorityLevelConfigurations()
fsi := fci.FlowSchemas() fsi := fci.FlowSchemas()
cfgCtlr.plLister = pli.Lister() cfgCtlr.plLister = pli.Lister()
@ -225,6 +212,7 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
cfgCtlr.configQueue.Add(0) cfgCtlr.configQueue.Add(0)
}}) }})
return cfgCtlr
} }
// MaintainObservations keeps the observers from // MaintainObservations keeps the observers from
@ -330,7 +318,7 @@ type cfgMeal struct {
fsStatusUpdates []fsStatusUpdate fsStatusUpdates []fsStatusUpdate
} }
// A buffered set of status updates for a FlowSchema // A buffered set of status updates for FlowSchemas
type fsStatusUpdate struct { type fsStatusUpdate struct {
flowSchema *flowcontrol.FlowSchema flowSchema *flowcontrol.FlowSchema
condition flowcontrol.FlowSchemaCondition condition flowcontrol.FlowSchemaCondition
@ -349,7 +337,7 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
} }
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
_, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status") _, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager}, "status")
if err != nil { if err != nil {
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
} }
@ -654,30 +642,28 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
klog.V(7).Infof("startRequest(%#+v)", rd) klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.Lock() cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock() defer cfgCtlr.lock.Unlock()
var selectedFlowSchema *flowcontrol.FlowSchema var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
for _, fs := range cfgCtlr.flowSchemas { for _, fs := range cfgCtlr.flowSchemas {
if matchesFlowSchema(rd, fs) { if matchesFlowSchema(rd, fs) {
selectedFlowSchema = fs selectedFlowSchema = fs
break break
} }
if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
catchAllFlowSchema = fs
}
} }
if selectedFlowSchema == nil { if selectedFlowSchema == nil {
// This should never happen. If the requestDigest's User is a part of // This should never happen. If the requestDigest's User is a part of
// system:authenticated or system:unauthenticated, the catch-all flow // system:authenticated or system:unauthenticated, the catch-all flow
// schema should match it. However, if that invariant somehow fails, // schema should match it. However, if that invariant somehow fails,
// fallback to the catch-all flow schema anyway. // fallback to the catch-all flow schema anyway.
for _, fs := range cfgCtlr.flowSchemas { if catchAllFlowSchema == nil {
if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
selectedFlowSchema = fs
break
}
}
if selectedFlowSchema == nil {
// This should absolutely never, ever happen! APF guarantees two // This should absolutely never, ever happen! APF guarantees two
// undeletable flow schemas at all times: an exempt flow schema and a // undeletable flow schemas at all times: an exempt flow schema and a
// catch-all flow schema. // catch-all flow schema.
panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User)) panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User))
} }
selectedFlowSchema = catchAllFlowSchema
klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema)) klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema))
} }
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name

View File

@ -34,6 +34,10 @@ import (
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
) )
// ConfigConsumerAsFieldManager is how the config consuminng
// controller appears in an ObjectMeta ManagedFieldsEntry.Manager
const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-v1"
// Interface defines how the API Priority and Fairness filter interacts with the underlying system. // Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface { type Interface interface {
// Handle takes care of queuing and dispatching a request // Handle takes care of queuing and dispatching a request
@ -74,26 +78,40 @@ func New(
requestWaitLimit time.Duration, requestWaitLimit time.Duration,
) Interface { ) Interface {
grc := counter.NoOp{} grc := counter.NoOp{}
return NewTestable( return NewTestable(TestableConfig{
informerFactory, InformerFactory: informerFactory,
flowcontrolClient, FlowcontrolClient: flowcontrolClient,
serverConcurrencyLimit, ServerConcurrencyLimit: serverConcurrencyLimit,
requestWaitLimit, RequestWaitLimit: requestWaitLimit,
metrics.PriorityLevelConcurrencyObserverPairGenerator, ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
fqs.NewQueueSetFactory(&clock.RealClock{}, grc), QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
) })
}
// TestableConfig carries the parameters to an implementation that is testable
type TestableConfig struct {
// InformerFactory to use in building the controller
InformerFactory kubeinformers.SharedInformerFactory
// FlowcontrolClient to use for manipulating config objects
FlowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface
// ServerConcurrencyLimit for the controller to enforce
ServerConcurrencyLimit int
// RequestWaitLimit configured on the server
RequestWaitLimit time.Duration
// ObsPairGenerator for metrics
ObsPairGenerator metrics.TimedObserverPairGenerator
// QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory
} }
// NewTestable is extra flexible to facilitate testing // NewTestable is extra flexible to facilitate testing
func NewTestable( func NewTestable(config TestableConfig) Interface {
informerFactory kubeinformers.SharedInformerFactory, return newTestableController(config)
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory,
) Interface {
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory)
} }
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,

View File

@ -230,14 +230,14 @@ func TestConfigConsumer(t *testing.T) {
heldRequestsMap: map[string][]heldRequest{}, heldRequestsMap: map[string][]heldRequest{},
queues: map[string]*ctlrTestQueueSet{}, queues: map[string]*ctlrTestQueueSet{},
} }
ctlr := newTestableController( ctlr := newTestableController(TestableConfig{
informerFactory, InformerFactory: informerFactory,
flowcontrolClient, FlowcontrolClient: flowcontrolClient,
100, // server concurrency limit ServerConcurrencyLimit: 100,
time.Minute, // request wait limit RequestWaitLimit: time.Minute,
metrics.PriorityLevelConcurrencyObserverPairGenerator, ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
cts, QueueSetFactory: cts,
) })
cts.cfgCtlr = ctlr cts.cfgCtlr = ctlr
persistingPLNames := sets.NewString() persistingPLNames := sets.NewString()
trialStep := fmt.Sprintf("trial%d-0", i) trialStep := fmt.Sprintf("trial%d-0", i)