diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 00b6c030179..c89cbbd86a1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "context" "fmt" "net/http" "runtime" @@ -113,55 +114,11 @@ func WithPriorityAndFairness( waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) } } - var resultCh chan interface{} - var forgetWatch utilflowcontrol.ForgetWatchFunc - if isWatchRequest { - resultCh = make(chan interface{}) - } - execute := func() { - noteExecutingDelta(1) - defer noteExecutingDelta(-1) - served = true - - innerCtx := ctx - innerReq := r - - var watchInitializationSignal utilflowcontrol.InitializationSignal - if isWatchRequest { - watchInitializationSignal = newInitializationSignal() - innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) - innerReq = r.Clone(innerCtx) - } - setResponseHeaders(classification, w) - - forgetWatch = fcIfc.RegisterWatch(requestInfo) - - if isWatchRequest { - go func() { - defer func() { - err := recover() - // do not wrap the sentinel ErrAbortHandler panic value - if err != nil && err != http.ErrAbortHandler { - // Same as stdlib http server code. Manually allocate stack - // trace buffer size to prevent excessively large logs - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - err = fmt.Sprintf("%v\n%s", err, buf) - } - resultCh <- err - }() - - // Protect from the situations when request will not reach storage layer - // and the initialization signal will not be send. - defer watchInitializationSignal.Signal() - - handler.ServeHTTP(w, innerReq) - }() - - watchInitializationSignal.Wait() + queueNote := func(inQueue bool) { + if inQueue { + noteWaitingDelta(1) } else { - handler.ServeHTTP(w, innerReq) + noteWaitingDelta(-1) } } @@ -171,13 +128,128 @@ func WithPriorityAndFairness( width := widthEstimator.EstimateWidth(r) digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width} - fcIfc.Handle(ctx, digest, note, func(inQueue bool) { - if inQueue { - noteWaitingDelta(1) - } else { - noteWaitingDelta(-1) + if isWatchRequest { + // This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute(). + // If APF rejects the request, it is never closed. + shouldStartWatchCh := make(chan struct{}) + + watchInitializationSignal := newInitializationSignal() + // This wraps the request passed to handler.ServeHTTP(), + // setting a context that plumbs watchInitializationSignal to storage + var watchReq *http.Request + // This is set inside execute(), prior to closing shouldStartWatchCh. + // If the request is rejected by APF it is left nil. + var forgetWatch utilflowcontrol.ForgetWatchFunc + + defer func() { + // Protect from the situation when request will not reach storage layer + // and the initialization signal will not be send. + if watchInitializationSignal != nil { + watchInitializationSignal.Signal() + } + // Forget the watcher if it was registered. + // + // // This is race-free because by this point, one of the following occurred: + // case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch + // case <-resultCh: Handle() completed, and Handle() does not return + // while execute() is running + if forgetWatch != nil { + forgetWatch() + } + }() + + execute := func() { + noteExecutingDelta(1) + defer noteExecutingDelta(-1) + served = true + setResponseHeaders(classification, w) + + forgetWatch = fcIfc.RegisterWatch(requestInfo) + + // Notify the main thread that we're ready to start the watch. + close(shouldStartWatchCh) + + // Wait until the request is finished from the APF point of view + // (which is when its initialization is done). + watchInitializationSignal.Wait() } - }, execute) + + // Ensure that an item can be put to resultCh asynchronously. + resultCh := make(chan interface{}, 1) + + // Call Handle in a separate goroutine. + // The reason for it is that from APF point of view, the request processing + // finishes as soon as watch is initialized (which is generally orders of + // magnitude faster then the watch request itself). This means that Handle() + // call finishes much faster and for performance reasons we want to reduce + // the number of running goroutines - so we run the shorter thing in a + // dedicated goroutine and the actual watch handler in the main one. + go func() { + defer func() { + err := recover() + // do not wrap the sentinel ErrAbortHandler panic value + if err != nil && err != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err = fmt.Sprintf("%v\n%s", err, buf) + } + + // Ensure that the result is put into resultCh independently of the panic. + resultCh <- err + }() + + // We create handleCtx with explicit cancelation function. + // The reason for it is that Handle() underneath may start additional goroutine + // that is blocked on context cancellation. However, from APF point of view, + // we don't want to wait until the whole watch request is processed (which is + // when it context is actually cancelled) - we want to unblock the goroutine as + // soon as the request is processed from the APF point of view. + // + // Note that we explicitly do NOT call the actuall handler using that context + // to avoid cancelling request too early. + handleCtx, handleCtxCancel := context.WithCancel(ctx) + defer handleCtxCancel() + + // Note that Handle will return irrespective of whether the request + // executes or is rejected. In the latter case, the function will return + // without calling the passed `execute` function. + fcIfc.Handle(handleCtx, digest, note, queueNote, execute) + }() + + select { + case <-shouldStartWatchCh: + watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) + watchReq = r.WithContext(watchCtx) + handler.ServeHTTP(w, watchReq) + // Protect from the situation when request will not reach storage layer + // and the initialization signal will not be send. + // It has to happen before waiting on the resultCh below. + watchInitializationSignal.Signal() + // TODO: Consider finishing the request as soon as Handle call panics. + if err := <-resultCh; err != nil { + panic(err) + } + case err := <-resultCh: + if err != nil { + panic(err) + } + } + } else { + execute := func() { + noteExecutingDelta(1) + defer noteExecutingDelta(-1) + served = true + setResponseHeaders(classification, w) + + handler.ServeHTTP(w, r) + } + + fcIfc.Handle(ctx, digest, note, queueNote, execute) + } + if !served { setResponseHeaders(classification, w) @@ -187,26 +259,8 @@ func WithPriorityAndFairness( epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) - if isWatchRequest { - close(resultCh) - } tooManyRequests(r, w) } - - // For watch requests, from the APF point of view the request is already - // finished at this point. However, that doesn't mean it is already finished - // from the non-APF point of view. So we need to wait here until the request is: - // 1) finished being processed or - // 2) rejected - if isWatchRequest { - if forgetWatch != nil { - forgetWatch() - } - err := <-resultCh - if err != nil { - panic(err) - } - } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 680109e8c43..7ebf1c5167b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -351,11 +351,31 @@ func TestApfExecuteMultipleRequests(t *testing.T) { }) } +func TestApfCancelWaitRequest(t *testing.T) { + epmetrics.Register() + + server := newApfServerWithSingleRequest(t, decisionCancelWait) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { + t.Error(err) + } + + checkForExpectedMetrics(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_request_terminations_total", + "apiserver_dropped_requests_total", + }) +} + type fakeWatchApfFilter struct { lock sync.Mutex inflight int capacity int + postExecutePanic bool + preExecutePanic bool + utilflowcontrol.WatchTracker } @@ -385,7 +405,13 @@ func (f *fakeWatchApfFilter) Handle(ctx context.Context, return } + if f.preExecutePanic { + panic("pre-exec-panic") + } execFn() + if f.postExecutePanic { + panic("post-exec-panic") + } f.lock.Lock() defer f.lock.Unlock() @@ -529,6 +555,53 @@ func TestApfWatchPanic(t *testing.T) { } } +func TestApfWatchHandlePanic(t *testing.T) { + preExecutePanicingFilter := newFakeWatchApfFilter(1) + preExecutePanicingFilter.preExecutePanic = true + + postExecutePanicingFilter := newFakeWatchApfFilter(1) + postExecutePanicingFilter.postExecutePanic = true + + testCases := []struct { + name string + filter *fakeWatchApfFilter + }{ + { + name: "pre-execute panic", + filter: preExecutePanicingFilter, + }, + { + name: "post-execute panic", + filter: postExecutePanicingFilter, + }, + } + + onExecuteFunc := func() { + time.Sleep(5 * time.Second) + } + postExecuteFunc := func() {} + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + apfHandler := newApfHandlerWithFilter(t, test.filter, onExecuteFunc, postExecuteFunc) + handler := func(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := recover(); err == nil { + t.Errorf("expected panic, got %v", err) + } + }() + apfHandler.ServeHTTP(w, r) + } + server := httptest.NewServer(http.HandlerFunc(handler)) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + // TestContextClosesOnRequestProcessed ensures that the request context is cancelled // automatically even if the server doesn't cancel is explicitly. // This is required to ensure we won't be leaking goroutines that wait for context @@ -556,23 +629,6 @@ func TestContextClosesOnRequestProcessed(t *testing.T) { wg.Wait() } -func TestApfCancelWaitRequest(t *testing.T) { - epmetrics.Register() - - server := newApfServerWithSingleRequest(t, decisionCancelWait) - defer server.Close() - - if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { - t.Error(err) - } - - checkForExpectedMetrics(t, []string{ - "apiserver_current_inflight_requests", - "apiserver_request_terminations_total", - "apiserver_dropped_requests_total", - }) -} - type fakeFilterRequestDigest struct { *fakeApfFilter requestDigestGot *utilflowcontrol.RequestDigest diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index f2e58a91bc2..8d914e796b6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -49,6 +49,8 @@ type Interface interface { // that the request should be executed then `execute()` will be // invoked once to execute the request; otherwise `execute()` will // not be invoked. + // Handle() should never return while execute() is running, even if + // ctx is cancelled or times out. Handle(ctx context.Context, requestDigest RequestDigest, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),