Address watch panics in P&F handler and extend testing.

This commit is contained in:
wojtekt
2021-05-27 14:49:54 +02:00
parent 0cc217647c
commit d9d51541a8
2 changed files with 81 additions and 10 deletions

View File

@@ -19,12 +19,11 @@ package filters
import (
"fmt"
"net/http"
"sync"
"runtime"
"sync/atomic"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
apitypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
@@ -112,7 +111,7 @@ func WithPriorityAndFairness(
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
wg := sync.WaitGroup{}
var resultCh chan interface{}
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
@@ -130,11 +129,22 @@ func WithPriorityAndFairness(
setResponseHeaders(classification, w)
if isWatchRequest {
wg.Add(1)
resultCh = make(chan interface{})
go func() {
defer utilruntime.HandleCrash()
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
resultCh <- err
}()
defer wg.Done()
// Protect from the situations when request will not reach storage layer
// and the initialization signal will not be send.
defer watchInitializationSignal.Signal()
@@ -165,12 +175,17 @@ func WithPriorityAndFairness(
}
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
if isWatchRequest {
wg.Done()
close(resultCh)
}
tooManyRequests(r, w)
}
// In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes.
wg.Wait()
if isWatchRequest {
err := <-resultCh
if err != nil {
panic(err)
}
}
})
}

View File

@@ -147,6 +147,11 @@ func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEn
}
func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server {
apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t))
return apfServer
}
func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
@@ -165,8 +170,7 @@ func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecu
}
}), requestInfoFactory)
apfServer := httptest.NewServer(handler)
return apfServer
return handler
}
func TestApfSkipLongRunningRequest(t *testing.T) {
@@ -469,6 +473,58 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
wg.Wait()
}
func TestApfWatchPanic(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 1,
}
onExecuteFunc := func() {
panic("test panic")
}
postExecuteFunc := func() {}
apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t)
handler := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err == nil {
t.Errorf("expected panic, got %v", err)
}
}()
apfHandler.ServeHTTP(w, r)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// TestContextClosesOnRequestProcessed ensures that the request context is cancelled
// automatically even if the server doesn't cancel is explicitly.
// This is required to ensure we won't be leaking goroutines that wait for context
// cancelling (e.g. in queueset::StartRequest method).
func TestContextClosesOnRequestProcessed(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
handler := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// asynchronously wait for context being closed
go func() {
<-ctx.Done()
wg.Done()
}()
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Errorf("unexpected error: %v", err)
}
wg.Wait()
}
func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()