From d9d51541a87ec627160d7d6a1fcd4b357a0fa493 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 27 May 2021 14:49:54 +0200 Subject: [PATCH] Address watch panics in P&F handler and extend testing. --- .../server/filters/priority-and-fairness.go | 31 +++++++--- .../filters/priority-and-fairness_test.go | 60 ++++++++++++++++++- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index bc589299f3f..13658377c67 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -19,12 +19,11 @@ package filters import ( "fmt" "net/http" - "sync" + "runtime" "sync/atomic" flowcontrol "k8s.io/api/flowcontrol/v1beta1" apitypes "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -112,7 +111,7 @@ func WithPriorityAndFairness( waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) } } - wg := sync.WaitGroup{} + var resultCh chan interface{} execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -130,11 +129,22 @@ func WithPriorityAndFairness( setResponseHeaders(classification, w) if isWatchRequest { - wg.Add(1) + resultCh = make(chan interface{}) go func() { - defer utilruntime.HandleCrash() + defer func() { + err := recover() + // do not wrap the sentinel ErrAbortHandler panic value + if err != nil && err != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err = fmt.Sprintf("%v\n%s", err, buf) + } + resultCh <- err + }() - defer wg.Done() // Protect from the situations when request will not reach storage layer // and the initialization signal will not be send. defer watchInitializationSignal.Signal() @@ -165,12 +175,17 @@ func WithPriorityAndFairness( } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) if isWatchRequest { - wg.Done() + close(resultCh) } tooManyRequests(r, w) } // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. - wg.Wait() + if isWatchRequest { + err := <-resultCh + if err != nil { + panic(err) + } + } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 1e9a8537b16..376ab7ed389 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -147,6 +147,11 @@ func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEn } func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { + apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t)) + return apfServer +} + +func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) @@ -165,8 +170,7 @@ func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecu } }), requestInfoFactory) - apfServer := httptest.NewServer(handler) - return apfServer + return handler } func TestApfSkipLongRunningRequest(t *testing.T) { @@ -469,6 +473,58 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { wg.Wait() } +func TestApfWatchPanic(t *testing.T) { + fakeFilter := &fakeWatchApfFilter{ + capacity: 1, + } + + onExecuteFunc := func() { + panic("test panic") + } + postExecuteFunc := func() {} + + apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + handler := func(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := recover(); err == nil { + t.Errorf("expected panic, got %v", err) + } + }() + apfHandler.ServeHTTP(w, r) + } + server := httptest.NewServer(http.HandlerFunc(handler)) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +// TestContextClosesOnRequestProcessed ensures that the request context is cancelled +// automatically even if the server doesn't cancel is explicitly. +// This is required to ensure we won't be leaking goroutines that wait for context +// cancelling (e.g. in queueset::StartRequest method). +func TestContextClosesOnRequestProcessed(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + handler := func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // asynchronously wait for context being closed + go func() { + <-ctx.Done() + wg.Done() + }() + } + server := httptest.NewServer(http.HandlerFunc(handler)) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Errorf("unexpected error: %v", err) + } + + wg.Wait() +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register()