do not allow inflight watermark histograms to fall too far behind

The MaxInFlight and PriorityAndFairness apiserver filters maintain
watermarks with histogram metrics that are observed when requests
are handled. When a request is received, the watermark observer
needs to fill out observations for the entire time period since the
last request was received. If it has been a long time since a
request has been received, then it can take an inordinate amount of
time to fill out the observations, to the extent that the request
may time out. To combat this, these changes will have the filters
fill out the observations on a 10-second interval, so that the
observations never fall too far behind.

This follows a similar approach taken in
9e89b92a92.

https://github.com/kubernetes/kubernetes/issues/95300

The Priority-and-Fairness and Max-in-Flight filters start goroutines to
handle some maintenance tasks on the watermarks for those filters. Once
started, these goroutines run forever. Instead, the goroutines should
have a lifetime tied to the lifetime of the apiserver.

These changes move the functionality for starting the goroutines to
a PostStartHook. The goroutines have been changed to accept a stop channel
and only run until the stop channel is closed.
This commit is contained in:
staebler 2020-10-05 11:26:48 -04:00
parent fe654797b5
commit 6c9b866468
6 changed files with 99 additions and 25 deletions

View File

@ -639,6 +639,31 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
}
// Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters.
if c.FlowControl != nil {
const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
} else {
const maxInFlightFilterHookName = "max-in-flight-filter"
if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
}
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {

View File

@ -155,6 +155,7 @@ func TestNewWithDelegate(t *testing.T) {
"/healthz/ping",
"/healthz/poststarthook/delegate-post-start-hook",
"/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/max-in-flight-filter",
"/healthz/poststarthook/wrapping-post-start-hook",
"/healthz/wrapping-health",
"/livez",
@ -163,6 +164,7 @@ func TestNewWithDelegate(t *testing.T) {
"/livez/ping",
"/livez/poststarthook/delegate-post-start-hook",
"/livez/poststarthook/generic-apiserver-start-informers",
"/livez/poststarthook/max-in-flight-filter",
"/livez/poststarthook/wrapping-post-start-hook",
"/metrics",
"/readyz",
@ -172,6 +174,7 @@ func TestNewWithDelegate(t *testing.T) {
"/readyz/ping",
"/readyz/poststarthook/delegate-post-start-hook",
"/readyz/poststarthook/generic-apiserver-start-informers",
"/readyz/poststarthook/max-in-flight-filter",
"/readyz/poststarthook/wrapping-post-start-hook",
"/readyz/shutdown",
}
@ -181,6 +184,7 @@ func TestNewWithDelegate(t *testing.T) {
[-]wrapping-health failed: reason withheld
[-]delegate-health failed: reason withheld
[+]poststarthook/generic-apiserver-start-informers ok
[+]poststarthook/max-in-flight-filter ok
[+]poststarthook/delegate-post-start-hook ok
[+]poststarthook/wrapping-post-start-hook ok
healthz check failed

View File

@ -41,6 +41,10 @@ const (
// the metrics tracks maximal value over period making this
// longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second
// How often to run maintenance on observations to ensure
// that they do not fall too far behind.
observationMaintenancePeriod = 10 * time.Second
)
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
@ -88,23 +92,29 @@ var watermark = &requestWatermark{
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
}
func startRecordingUsage(watermark *requestWatermark) {
go func() {
wait.Forever(func() {
watermark.lock.Lock()
readOnlyWatermark := watermark.readOnlyWatermark
mutatingWatermark := watermark.mutatingWatermark
watermark.readOnlyWatermark = 0
watermark.mutatingWatermark = 0
watermark.lock.Unlock()
// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) {
// Periodically update the inflight usage metric.
go wait.Until(func() {
watermark.lock.Lock()
readOnlyWatermark := watermark.readOnlyWatermark
mutatingWatermark := watermark.mutatingWatermark
watermark.readOnlyWatermark = 0
watermark.mutatingWatermark = 0
watermark.lock.Unlock()
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod)
}()
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod, stopCh)
// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do
// fall too far behind, then there is a long delay in responding to the next request received while the observer
// catches back up.
go wait.Until(func() {
watermark.readOnlyObserver.Add(0)
watermark.mutatingObserver.Add(0)
}, observationMaintenancePeriod, stopCh)
}
var startOnce sync.Once
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit(
handler http.Handler,
@ -112,7 +122,6 @@ func WithMaxInFlightLimit(
mutatingLimit int,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
startOnce.Do(func() { startRecordingUsage(watermark) })
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
@ -198,6 +207,12 @@ func WithMaxInFlightLimit(
})
}
// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight
// requests.
func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
}
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter)

View File

@ -17,6 +17,7 @@ limitations under the License.
package filters
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@ -103,6 +104,10 @@ func TestMaxInFlightNonMutating(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang, but not affect accounting. use a query param match
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
// These should hang waiting on block...
@ -183,6 +188,10 @@ func TestMaxInFlightMutating(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
// These should hang waiting on block...
@ -275,6 +284,10 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
// These should hang waiting on block...

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
@ -58,9 +57,6 @@ var waitingMark = &requestWatermark{
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
}
// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler
var apfStartOnce sync.Once
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
@ -75,12 +71,6 @@ func WithPriorityAndFairness(
klog.Warningf("priority and fairness support not found, skipping")
return handler
}
startOnce.Do(func() {
startRecordingUsage(watermark)
})
apfStartOnce.Do(func() {
startRecordingUsage(waitingMark)
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@ -156,3 +146,10 @@ func WithPriorityAndFairness(
})
}
// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for
// priority-and-fairness requests.
func StartPriorityAndFairnessWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
startWatermarkMaintenance(waitingMark, stopCh)
}

View File

@ -153,6 +153,10 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionSkipFilter, t)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
// send a watch request to test skipping long running request
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
// request should not be rejected
@ -166,6 +170,10 @@ func TestApfRejectRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionReject, t)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err)
}
@ -187,6 +195,10 @@ func TestApfExemptRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
@ -209,6 +221,10 @@ func TestApfExecuteRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
@ -274,6 +290,10 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
for i := 0; i < concurrentRequests; i++ {
var err error
go func() {