diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 80e89e5210a..e3e8afcdcaa 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -639,6 +639,31 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName) } + // Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters. + if c.FlowControl != nil { + const priorityAndFairnessFilterHookName = "priority-and-fairness-filter" + if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) { + err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error { + genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh) + return nil + }) + if err != nil { + return nil, err + } + } + } else { + const maxInFlightFilterHookName = "max-in-flight-filter" + if !s.isPostStartHookRegistered(maxInFlightFilterHookName) { + err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error { + genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh) + return nil + }) + if err != nil { + return nil, err + } + } + } + for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index fbd32097c8b..ae5e23c084a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -155,6 +155,7 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/ping", "/healthz/poststarthook/delegate-post-start-hook", "/healthz/poststarthook/generic-apiserver-start-informers", + "/healthz/poststarthook/max-in-flight-filter", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", "/livez", @@ -163,6 +164,7 @@ func TestNewWithDelegate(t *testing.T) { "/livez/ping", "/livez/poststarthook/delegate-post-start-hook", "/livez/poststarthook/generic-apiserver-start-informers", + "/livez/poststarthook/max-in-flight-filter", "/livez/poststarthook/wrapping-post-start-hook", "/metrics", "/readyz", @@ -172,6 +174,7 @@ func TestNewWithDelegate(t *testing.T) { "/readyz/ping", "/readyz/poststarthook/delegate-post-start-hook", "/readyz/poststarthook/generic-apiserver-start-informers", + "/readyz/poststarthook/max-in-flight-filter", "/readyz/poststarthook/wrapping-post-start-hook", "/readyz/shutdown", } @@ -181,6 +184,7 @@ func TestNewWithDelegate(t *testing.T) { [-]wrapping-health failed: reason withheld [-]delegate-health failed: reason withheld [+]poststarthook/generic-apiserver-start-informers ok +[+]poststarthook/max-in-flight-filter ok [+]poststarthook/delegate-post-start-hook ok [+]poststarthook/wrapping-post-start-hook ok healthz check failed diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 946ab4e605d..e873351c70b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -41,6 +41,10 @@ const ( // the metrics tracks maximal value over period making this // longer will increase the metric value. inflightUsageMetricUpdatePeriod = time.Second + + // How often to run maintenance on observations to ensure + // that they do not fall too far behind. + observationMaintenancePeriod = 10 * time.Second ) var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") @@ -88,23 +92,29 @@ var watermark = &requestWatermark{ mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, } -func startRecordingUsage(watermark *requestWatermark) { - go func() { - wait.Forever(func() { - watermark.lock.Lock() - readOnlyWatermark := watermark.readOnlyWatermark - mutatingWatermark := watermark.mutatingWatermark - watermark.readOnlyWatermark = 0 - watermark.mutatingWatermark = 0 - watermark.lock.Unlock() +// startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark. +func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) { + // Periodically update the inflight usage metric. + go wait.Until(func() { + watermark.lock.Lock() + readOnlyWatermark := watermark.readOnlyWatermark + mutatingWatermark := watermark.mutatingWatermark + watermark.readOnlyWatermark = 0 + watermark.mutatingWatermark = 0 + watermark.lock.Unlock() - metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) - }, inflightUsageMetricUpdatePeriod) - }() + metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) + }, inflightUsageMetricUpdatePeriod, stopCh) + + // Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do + // fall too far behind, then there is a long delay in responding to the next request received while the observer + // catches back up. + go wait.Until(func() { + watermark.readOnlyObserver.Add(0) + watermark.mutatingObserver.Add(0) + }, observationMaintenancePeriod, stopCh) } -var startOnce sync.Once - // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. func WithMaxInFlightLimit( handler http.Handler, @@ -112,7 +122,6 @@ func WithMaxInFlightLimit( mutatingLimit int, longRunningRequestCheck apirequest.LongRunningRequestCheck, ) http.Handler { - startOnce.Do(func() { startRecordingUsage(watermark) }) if nonMutatingLimit == 0 && mutatingLimit == 0 { return handler } @@ -198,6 +207,12 @@ func WithMaxInFlightLimit( }) } +// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight +// requests. +func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) { + startWatermarkMaintenance(watermark, stopCh) +} + func tooManyRequests(req *http.Request, w http.ResponseWriter) { // Return a 429 status indicating "Too Many Requests" w.Header().Set("Retry-After", retryAfter) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go index c3b4c53d35e..3bbcace9230 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -103,6 +104,10 @@ func TestMaxInFlightNonMutating(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang, but not affect accounting. use a query param match for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ { // These should hang waiting on block... @@ -183,6 +188,10 @@ func TestMaxInFlightMutating(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang and be accounted, i.e. saturate the server for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { // These should hang waiting on block... @@ -275,6 +284,10 @@ func TestMaxInFlightSkipsMasters(t *testing.T) { server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartMaxInFlightWatermarkMaintenance(ctx.Done()) + // These should hang and be accounted, i.e. saturate the server for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { // These should hang waiting on block... diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6c9e8b4938c..fa5426b387b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "sync" "sync/atomic" fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" @@ -58,9 +57,6 @@ var waitingMark = &requestWatermark{ mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, } -// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler -var apfStartOnce sync.Once - var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32 @@ -75,12 +71,6 @@ func WithPriorityAndFairness( klog.Warningf("priority and fairness support not found, skipping") return handler } - startOnce.Do(func() { - startRecordingUsage(watermark) - }) - apfStartOnce.Do(func() { - startRecordingUsage(waitingMark) - }) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) @@ -156,3 +146,10 @@ func WithPriorityAndFairness( }) } + +// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for +// priority-and-fairness requests. +func StartPriorityAndFairnessWatermarkMaintenance(stopCh <-chan struct{}) { + startWatermarkMaintenance(watermark, stopCh) + startWatermarkMaintenance(waitingMark, stopCh) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index d2e130b3576..cd8787382b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -153,6 +153,10 @@ func TestApfSkipLongRunningRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionSkipFilter, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + // send a watch request to test skipping long running request if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { // request should not be rejected @@ -166,6 +170,10 @@ func TestApfRejectRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionReject, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { t.Error(err) } @@ -187,6 +195,10 @@ func TestApfExemptRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { t.Error(err) } @@ -209,6 +221,10 @@ func TestApfExecuteRequest(t *testing.T) { server := newApfServerWithSingleRequest(decisionQueuingExecute, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { t.Error(err) } @@ -274,6 +290,10 @@ func TestApfExecuteMultipleRequests(t *testing.T) { server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) defer server.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) + for i := 0; i < concurrentRequests; i++ { var err error go func() {