mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
Merge pull request #104833 from MikeSpreitzer/fix104811
Refine locking in API Priority and Fairness config controller
This commit is contained in:
commit
9f6a31916a
@ -68,6 +68,14 @@ const timeFmt = "2006-01-02T15:04:05.999"
|
|||||||
// undesired becomes completely unused, all the config objects are
|
// undesired becomes completely unused, all the config objects are
|
||||||
// read and processed as a whole.
|
// read and processed as a whole.
|
||||||
|
|
||||||
|
// The funcs in this package follow the naming convention that the suffix
|
||||||
|
// "Locked" means the relevant mutex must be locked at the start of each
|
||||||
|
// call and will be locked upon return. For a configController, the
|
||||||
|
// suffix "ReadLocked" stipulates a read lock while just "Locked"
|
||||||
|
// stipulates a full lock. Absence of either suffix means that either
|
||||||
|
// (a) the lock must NOT be held at call time and will not be held
|
||||||
|
// upon return or (b) locking is irrelevant.
|
||||||
|
|
||||||
// StartFunction begins the process of handling a request. If the
|
// StartFunction begins the process of handling a request. If the
|
||||||
// request gets queued then this function uses the given hashValue as
|
// request gets queued then this function uses the given hashValue as
|
||||||
// the source of entropy as it shuffle-shards the request into a
|
// the source of entropy as it shuffle-shards the request into a
|
||||||
@ -125,9 +133,25 @@ type configController struct {
|
|||||||
// requestWaitLimit comes from server configuration.
|
// requestWaitLimit comes from server configuration.
|
||||||
requestWaitLimit time.Duration
|
requestWaitLimit time.Duration
|
||||||
|
|
||||||
|
// watchTracker implements the necessary WatchTracker interface.
|
||||||
|
WatchTracker
|
||||||
|
|
||||||
|
// the most recent update attempts, ordered by increasing age.
|
||||||
|
// Consumer trims to keep only the last minute's worth of entries.
|
||||||
|
// The controller uses this to limit itself to at most six updates
|
||||||
|
// to a given FlowSchema in any minute.
|
||||||
|
// This may only be accessed from the one and only worker goroutine.
|
||||||
|
mostRecentUpdates []updateAttempt
|
||||||
|
|
||||||
// This must be locked while accessing flowSchemas or
|
// This must be locked while accessing flowSchemas or
|
||||||
// priorityLevelStates.
|
// priorityLevelStates. A lock for writing is needed
|
||||||
lock sync.Mutex
|
// for writing to any of the following:
|
||||||
|
// - the flowSchemas field
|
||||||
|
// - the slice held in the flowSchemas field
|
||||||
|
// - the priorityLevelStates field
|
||||||
|
// - the map held in the priorityLevelStates field
|
||||||
|
// - any field of a priorityLevelState held in that map
|
||||||
|
lock sync.RWMutex
|
||||||
|
|
||||||
// flowSchemas holds the flow schema objects, sorted by increasing
|
// flowSchemas holds the flow schema objects, sorted by increasing
|
||||||
// numerical (decreasing logical) matching precedence. Every
|
// numerical (decreasing logical) matching precedence. Every
|
||||||
@ -138,16 +162,6 @@ type configController struct {
|
|||||||
// name to the state for that level. Every name referenced from a
|
// name to the state for that level. Every name referenced from a
|
||||||
// member of `flowSchemas` has an entry here.
|
// member of `flowSchemas` has an entry here.
|
||||||
priorityLevelStates map[string]*priorityLevelState
|
priorityLevelStates map[string]*priorityLevelState
|
||||||
|
|
||||||
// the most recent update attempts, ordered by increasing age.
|
|
||||||
// Consumer trims to keep only the last minute's worth of entries.
|
|
||||||
// The controller uses this to limit itself to at most six updates
|
|
||||||
// to a given FlowSchema in any minute.
|
|
||||||
// This may only be accessed from the one and only worker goroutine.
|
|
||||||
mostRecentUpdates []updateAttempt
|
|
||||||
|
|
||||||
// watchTracker implements the necessary WatchTracker interface.
|
|
||||||
WatchTracker
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateAttempt struct {
|
type updateAttempt struct {
|
||||||
@ -281,8 +295,8 @@ func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cfgCtlr *configController) updateObservations() {
|
func (cfgCtlr *configController) updateObservations() {
|
||||||
cfgCtlr.lock.Lock()
|
cfgCtlr.lock.RLock()
|
||||||
defer cfgCtlr.lock.Unlock()
|
defer cfgCtlr.lock.RUnlock()
|
||||||
for _, plc := range cfgCtlr.priorityLevelStates {
|
for _, plc := range cfgCtlr.priorityLevelStates {
|
||||||
if plc.queues != nil {
|
if plc.queues != nil {
|
||||||
plc.queues.UpdateObservations()
|
plc.queues.UpdateObservations()
|
||||||
@ -779,8 +793,8 @@ func (immediateRequest) Finish(execute func()) bool {
|
|||||||
// waiting in its queue, or `Time{}` if this did not happen.
|
// waiting in its queue, or `Time{}` if this did not happen.
|
||||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) {
|
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) {
|
||||||
klog.V(7).Infof("startRequest(%#+v)", rd)
|
klog.V(7).Infof("startRequest(%#+v)", rd)
|
||||||
cfgCtlr.lock.Lock()
|
cfgCtlr.lock.RLock()
|
||||||
defer cfgCtlr.lock.Unlock()
|
defer cfgCtlr.lock.RUnlock()
|
||||||
var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
|
var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
|
||||||
for _, fs := range cfgCtlr.flowSchemas {
|
for _, fs := range cfgCtlr.flowSchemas {
|
||||||
if matchesFlowSchema(rd, fs) {
|
if matchesFlowSchema(rd, fs) {
|
||||||
@ -824,7 +838,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
||||||
req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||||
if idle {
|
if idle {
|
||||||
cfgCtlr.maybeReapLocked(plName, plState)
|
cfgCtlr.maybeReapReadLocked(plName, plState)
|
||||||
}
|
}
|
||||||
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher
|
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher
|
||||||
}
|
}
|
||||||
@ -833,8 +847,8 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
// priority level if it has no more use. Call this after getting a
|
// priority level if it has no more use. Call this after getting a
|
||||||
// clue that the given priority level is undesired and idle.
|
// clue that the given priority level is undesired and idle.
|
||||||
func (cfgCtlr *configController) maybeReap(plName string) {
|
func (cfgCtlr *configController) maybeReap(plName string) {
|
||||||
cfgCtlr.lock.Lock()
|
cfgCtlr.lock.RLock()
|
||||||
defer cfgCtlr.lock.Unlock()
|
defer cfgCtlr.lock.RUnlock()
|
||||||
plState := cfgCtlr.priorityLevelStates[plName]
|
plState := cfgCtlr.priorityLevelStates[plName]
|
||||||
if plState == nil {
|
if plState == nil {
|
||||||
klog.V(7).Infof("plName=%s, plState==nil", plName)
|
klog.V(7).Infof("plName=%s, plState==nil", plName)
|
||||||
@ -856,7 +870,7 @@ func (cfgCtlr *configController) maybeReap(plName string) {
|
|||||||
// it has no more use. Call this if both (1) plState.queues is
|
// it has no more use. Call this if both (1) plState.queues is
|
||||||
// non-nil and reported being idle, and (2) cfgCtlr's lock has not
|
// non-nil and reported being idle, and (2) cfgCtlr's lock has not
|
||||||
// been released since then.
|
// been released since then.
|
||||||
func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
|
func (cfgCtlr *configController) maybeReapReadLocked(plName string, plState *priorityLevelState) {
|
||||||
if !(plState.quiescing && plState.numPending == 0) {
|
if !(plState.quiescing && plState.numPending == 0) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user