Merge pull request #95371 from staebler/plumb_channel_into_filters

plumb channel for lifetime of filter watermarks
This commit is contained in:
Kubernetes Prow Robot 2020-11-04 11:44:52 -08:00 committed by GitHub
commit 396b90f06c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 25 deletions

View File

@ -639,6 +639,31 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName) klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
} }
// Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters.
if c.FlowControl != nil {
const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
} else {
const maxInFlightFilterHookName = "max-in-flight-filter"
if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
}
for _, delegateCheck := range delegationTarget.HealthzChecks() { for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false skip := false
for _, existingCheck := range c.HealthzChecks { for _, existingCheck := range c.HealthzChecks {

View File

@ -155,6 +155,7 @@ func TestNewWithDelegate(t *testing.T) {
"/healthz/ping", "/healthz/ping",
"/healthz/poststarthook/delegate-post-start-hook", "/healthz/poststarthook/delegate-post-start-hook",
"/healthz/poststarthook/generic-apiserver-start-informers", "/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/max-in-flight-filter",
"/healthz/poststarthook/wrapping-post-start-hook", "/healthz/poststarthook/wrapping-post-start-hook",
"/healthz/wrapping-health", "/healthz/wrapping-health",
"/livez", "/livez",
@ -163,6 +164,7 @@ func TestNewWithDelegate(t *testing.T) {
"/livez/ping", "/livez/ping",
"/livez/poststarthook/delegate-post-start-hook", "/livez/poststarthook/delegate-post-start-hook",
"/livez/poststarthook/generic-apiserver-start-informers", "/livez/poststarthook/generic-apiserver-start-informers",
"/livez/poststarthook/max-in-flight-filter",
"/livez/poststarthook/wrapping-post-start-hook", "/livez/poststarthook/wrapping-post-start-hook",
"/metrics", "/metrics",
"/readyz", "/readyz",
@ -172,6 +174,7 @@ func TestNewWithDelegate(t *testing.T) {
"/readyz/ping", "/readyz/ping",
"/readyz/poststarthook/delegate-post-start-hook", "/readyz/poststarthook/delegate-post-start-hook",
"/readyz/poststarthook/generic-apiserver-start-informers", "/readyz/poststarthook/generic-apiserver-start-informers",
"/readyz/poststarthook/max-in-flight-filter",
"/readyz/poststarthook/wrapping-post-start-hook", "/readyz/poststarthook/wrapping-post-start-hook",
"/readyz/shutdown", "/readyz/shutdown",
} }
@ -181,6 +184,7 @@ func TestNewWithDelegate(t *testing.T) {
[-]wrapping-health failed: reason withheld [-]wrapping-health failed: reason withheld
[-]delegate-health failed: reason withheld [-]delegate-health failed: reason withheld
[+]poststarthook/generic-apiserver-start-informers ok [+]poststarthook/generic-apiserver-start-informers ok
[+]poststarthook/max-in-flight-filter ok
[+]poststarthook/delegate-post-start-hook ok [+]poststarthook/delegate-post-start-hook ok
[+]poststarthook/wrapping-post-start-hook ok [+]poststarthook/wrapping-post-start-hook ok
healthz check failed healthz check failed

View File

@ -41,6 +41,10 @@ const (
// the metrics tracks maximal value over period making this // the metrics tracks maximal value over period making this
// longer will increase the metric value. // longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second inflightUsageMetricUpdatePeriod = time.Second
// How often to run maintenance on observations to ensure
// that they do not fall too far behind.
observationMaintenancePeriod = 10 * time.Second
) )
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
@ -88,23 +92,29 @@ var watermark = &requestWatermark{
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting, mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
} }
func startRecordingUsage(watermark *requestWatermark) { // startWatermarkMaintenance starts the goroutines to observe and maintain the specified watermark.
go func() { func startWatermarkMaintenance(watermark *requestWatermark, stopCh <-chan struct{}) {
wait.Forever(func() { // Periodically update the inflight usage metric.
watermark.lock.Lock() go wait.Until(func() {
readOnlyWatermark := watermark.readOnlyWatermark watermark.lock.Lock()
mutatingWatermark := watermark.mutatingWatermark readOnlyWatermark := watermark.readOnlyWatermark
watermark.readOnlyWatermark = 0 mutatingWatermark := watermark.mutatingWatermark
watermark.mutatingWatermark = 0 watermark.readOnlyWatermark = 0
watermark.lock.Unlock() watermark.mutatingWatermark = 0
watermark.lock.Unlock()
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark) metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod) }, inflightUsageMetricUpdatePeriod, stopCh)
}()
// Periodically observe the watermarks. This is done to ensure that they do not fall too far behind. When they do
// fall too far behind, then there is a long delay in responding to the next request received while the observer
// catches back up.
go wait.Until(func() {
watermark.readOnlyObserver.Add(0)
watermark.mutatingObserver.Add(0)
}, observationMaintenancePeriod, stopCh)
} }
var startOnce sync.Once
// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel. // WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit( func WithMaxInFlightLimit(
handler http.Handler, handler http.Handler,
@ -112,7 +122,6 @@ func WithMaxInFlightLimit(
mutatingLimit int, mutatingLimit int,
longRunningRequestCheck apirequest.LongRunningRequestCheck, longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler { ) http.Handler {
startOnce.Do(func() { startRecordingUsage(watermark) })
if nonMutatingLimit == 0 && mutatingLimit == 0 { if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler return handler
} }
@ -198,6 +207,12 @@ func WithMaxInFlightLimit(
}) })
} }
// StartMaxInFlightWatermarkMaintenance starts the goroutines to observe and maintain watermarks for max-in-flight
// requests.
func StartMaxInFlightWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
}
func tooManyRequests(req *http.Request, w http.ResponseWriter) { func tooManyRequests(req *http.Request, w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests" // Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", retryAfter) w.Header().Set("Retry-After", retryAfter)

View File

@ -17,6 +17,7 @@ limitations under the License.
package filters package filters
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -103,6 +104,10 @@ func TestMaxInFlightNonMutating(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1) server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, AllowedNonMutatingInflightRequestsNo, 1)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang, but not affect accounting. use a query param match // These should hang, but not affect accounting. use a query param match
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ { for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
// These should hang waiting on block... // These should hang waiting on block...
@ -183,6 +188,10 @@ func TestMaxInFlightMutating(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang and be accounted, i.e. saturate the server // These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
// These should hang waiting on block... // These should hang waiting on block...
@ -275,6 +284,10 @@ func TestMaxInFlightSkipsMasters(t *testing.T) {
server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo) server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartMaxInFlightWatermarkMaintenance(ctx.Done())
// These should hang and be accounted, i.e. saturate the server // These should hang and be accounted, i.e. saturate the server
for i := 0; i < AllowedMutatingInflightRequestsNo; i++ { for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
// These should hang waiting on block... // These should hang waiting on block...

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"sync/atomic" "sync/atomic"
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
@ -58,9 +57,6 @@ var waitingMark = &requestWatermark{
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
} }
// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler
var apfStartOnce sync.Once
var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32
@ -75,12 +71,6 @@ func WithPriorityAndFairness(
klog.Warningf("priority and fairness support not found, skipping") klog.Warningf("priority and fairness support not found, skipping")
return handler return handler
} }
startOnce.Do(func() {
startRecordingUsage(watermark)
})
apfStartOnce.Do(func() {
startRecordingUsage(waitingMark)
})
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)
@ -156,3 +146,10 @@ func WithPriorityAndFairness(
}) })
} }
// StartPriorityAndFairnessWatermarkMaintenance starts the goroutines to observe and maintain watermarks for
// priority-and-fairness requests.
func StartPriorityAndFairnessWatermarkMaintenance(stopCh <-chan struct{}) {
startWatermarkMaintenance(watermark, stopCh)
startWatermarkMaintenance(waitingMark, stopCh)
}

View File

@ -153,6 +153,10 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionSkipFilter, t) server := newApfServerWithSingleRequest(decisionSkipFilter, t)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
// send a watch request to test skipping long running request // send a watch request to test skipping long running request
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
// request should not be rejected // request should not be rejected
@ -166,6 +170,10 @@ func TestApfRejectRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionReject, t) server := newApfServerWithSingleRequest(decisionReject, t)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err) t.Error(err)
} }
@ -187,6 +195,10 @@ func TestApfExemptRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
@ -209,6 +221,10 @@ func TestApfExecuteRequest(t *testing.T) {
server := newApfServerWithSingleRequest(decisionQueuingExecute, t) server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil {
t.Error(err) t.Error(err)
} }
@ -274,6 +290,10 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
defer server.Close() defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
for i := 0; i < concurrentRequests; i++ { for i := 0; i < concurrentRequests; i++ {
var err error var err error
go func() { go func() {