From 6c9b86646871f13a4431361310ba6a0785372053 Mon Sep 17 00:00:00 2001 From: staebler Date: Mon, 5 Oct 2020 11:26:48 -0400 Subject: [PATCH] do not allow inflight watermark histograms to fall too far behind The MaxInFlight and PriorityAndFairness apiserver filters maintain watermarks with histogram metrics that are observed when requests are handled. When a request is received, the watermark observer needs to fill out observations for the entire time period since the last request was received. If it has been a long time since a request has been received, then it can take an inordinate amount of time to fill out the observations, to the extent that the request may time out. To combat this, these changes will have the filters fill out the observations on a 10-second interval, so that the observations never fall too far behind. This follows a similar approach taken in 9e89b92a92c02cdd2c70c0f52a30936e9c3309c7. https://github.com/kubernetes/kubernetes/issues/95300 The Priority-and-Fairness and Max-in-Flight filters start goroutines to handle some maintenance tasks on the watermarks for those filters. Once started, these goroutines run forever. Instead, the goroutines should have a lifetime tied to the lifetime of the apiserver. These changes move the functionality for starting the goroutines to a PostStartHook. The goroutines have been changed to accept a stop channel and only run until the stop channel is closed. --- .../src/k8s.io/apiserver/pkg/server/config.go | 25 +++++++++++ .../apiserver/pkg/server/config_test.go | 4 ++ .../pkg/server/filters/maxinflight.go | 45 ++++++++++++------- .../pkg/server/filters/maxinflight_test.go | 13 ++++++ .../server/filters/priority-and-fairness.go | 17 +++---- .../filters/priority-and-fairness_test.go | 20 +++++++++ 6 files changed, 99 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 80e89e5210a..e3e8afcdcaa 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -639,6 +639,31 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName) } + // Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters. + if c.FlowControl != nil { + const priorityAndFairnessFilterHookName = "priority-and-fairness-filter" + if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) { + err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error { + genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh) + return nil + }) + if err != nil { + return nil, err + } + } + } else { + const maxInFlightFilterHookName = "max-in-flight-filter" + if !s.isPostStartHookRegistered(maxInFlightFilterHookName) { + err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error { + genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh) + return nil + }) + if err != nil { + return nil, err + } + } + } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index fbd32097c8b..ae5e23c084a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -155,6 +155,7 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/ping", "/healthz/poststarthook/delegate-post-start-hook", "/healthz/poststarthook/generic-apiserver-start-informers", + "/healthz/poststarthook/max-in-flight-filter", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", "/livez", @@ -163,6 +164,7 @@ func TestNewWithDelegate(t *testing.T) { "/livez/ping", "/livez/poststarthook/delegate-post-start-hook", "/livez/poststarthook/generic-apiserver-start-informers", + "/livez/poststarthook/max-in-flight-filter", "/livez/poststarthook/wrapping-post-start-hook", "/metrics", "/readyz", @@ -172,6 +174,7 @@ func TestNewWithDelegate(t *testing.T) { "/readyz/ping", "/readyz/poststarthook/delegate-post-start-hook", "/readyz/poststarthook/generic-apiserver-start-informers", + "/readyz/poststarthook/max-in-flight-filter", "/readyz/poststarthook/wrapping-post-start-hook", "/readyz/shutdown", } @@ -181,6 +184,7 @@ func TestNewWithDelegate(t *testing.T) { [-]wrapping-health failed: reason withheld [-]delegate-health failed: reason withheld [+]poststarthook/generic-apiserver-start-informers ok +[+]poststarthook/max-in-flight-filter ok [+]poststarthook/delegate-post-start-hook ok [+]poststarthook/wrapping-post-start-hook ok healthz check failed diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 946ab4e605d..e873351c70b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -41,6 +41,10 @@ const ( // the metrics tracks maximal value over period making this // longer will increase the metric value. inflightUsageMetricUpdatePeriod = time.Second + + // How often to run maintenance on observations to ensure + // that they do not fall too far behind. + observationMaintenancePeriod = 10 * time.Second ) var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") @@ -88,23 +92,29 @@ var watermark = &requestWatermark{ mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, } -func startRecordingUsage(watermark *requestWatermark) { - go func() { - wait.Forever(func() { - watermark.lock.Lock() - readOnlyWatermark := watermark.readOnlyWatermark - mutatingWatermark := watermark.mutatingWatermark - watermark.readOnlyWatermark = 0 - watermark.mutatingWatermark = 0 - watermark.lock.Unlock() +// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. +func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) { + // Periodically update the inflight usage metric. + go wait.Until(func() { + watermark.lock.Lock() + readOnlyWatermark := watermark.readOnlyWatermark + mutatingWatermark := watermark.mutatingWatermark + watermark.readOnlyWatermark = 0 + watermark.mutatingWatermark = 0 + watermark.lock.Unlock() - metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) - }, inflightUsageMetricUpdatePeriod) - }() + metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) + }, inflightUsageMetricUpdatePeriod, stopCh) + + // Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do + // fall too far behind, then there is a long delay in responding to the next request received while the observer + // catches back up. + go wait.Until(func() { + watermark.readOnlyObserver.Add(0) + watermark.mutatingObserver.Add(0) + }, observationMaintenancePeriod, stopCh) } -var startOnce sync.Once - // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. func WithMaxInFlightLimit( handler http.Handler, @@ -112,7 +122,6 @@ func WithMaxInFlightLimit( mutatingLimit int, longRunningRequestCheck apirequest.LongRunningRequestCheck, ) http.Handler { - startOnce.Do(func() { startRecordingUsage(watermark) }) if nonMutatingLimit == 0 && mutatingLimit == 0 { return handler } @@ -198,6 +207,12 @@ func WithMaxInFlightLimit( }) } +// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight +// requests. +func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) { + startWatermarkMaintenance(watermark, stopCh) +} + func tooManyRequests(req *http.Request, w http.ResponseWriter) { // Return a 429 status indicating "Too Many Requests" w.Header().Set("Retry-After", retryAfter) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go index c3b4c53d35e..3bbcace9230 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -103,6 +104,10 @@ func TestMaxInFlightNonMutating(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang, but not affect accounting. use a query param match for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ { // These should hang waiting on block... @@ -183,6 +188,10 @@ func TestMaxInFlightMutating(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang and be accounted, i.e. saturate the server for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { // These should hang waiting on block... @@ -275,6 +284,10 @@ func TestMaxInFlightSkipsMasters(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang and be accounted, i.e. saturate the server for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { // These should hang waiting on block... diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6c9e8b4938c..fa5426b387b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "sync" "sync/atomic" fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" @@ -58,9 +57,6 @@ var waitingMark = &requestWatermark{ mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, } -// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler -var apfStartOnce sync.Once - var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32 @@ -75,12 +71,6 @@ func WithPriorityAndFairness( klog.Warningf("priority and fairness support not found, skipping") return handler } - startOnce.Do(func() { - startRecordingUsage(watermark) - }) - apfStartOnce.Do(func() { - startRecordingUsage(waitingMark) - }) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) @@ -156,3 +146,10 @@ func WithPriorityAndFairness( }) } + +// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for +// priority-and-fairness requests. +func StartPriorityAndFairnessWatermarkMaintenance(stopCh <-chan struct{}) { + startWatermarkMaintenance(watermark, stopCh) + startWatermarkMaintenance(waitingMark, stopCh) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index d2e130b3576..cd8787382b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -153,6 +153,10 @@ func TestApfSkipLongRunningRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionSkipFilter, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + // send a watch request to test skipping long running request if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { // request should not be rejected @@ -166,6 +170,10 @@ func TestApfRejectRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionReject, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { t.Error(err) } @@ -187,6 +195,10 @@ func TestApfExemptRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { t.Error(err) } @@ -209,6 +221,10 @@ func TestApfExecuteRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionQueuingExecute, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { t.Error(err) } @@ -274,6 +290,10 @@ func TestApfExecuteMultipleRequests(t *testing.T) { server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + for i := 0; i < concurrentRequests; i++ { var err error go func() {