Merge pull request #97206 from tkashem/panic

clean up executing request on panic
This commit is contained in:
Kubernetes Prow Robot 2020-12-22 10:26:26 -08:00 committed by GitHub
commit d815833a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 376 additions and 8 deletions

View File

@ -21,10 +21,13 @@ go_test(
"//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
@ -34,7 +37,11 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/golang.org/x/net/http2:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],

View File

@ -21,12 +21,19 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
@ -36,7 +43,11 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
type mockDecision int
@ -333,6 +344,243 @@ func TestApfCancelWaitRequest(t *testing.T) {
})
}
func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) {
fcmetrics.Register()
t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) {
const (
requestTimeout = time.Minute
userName = "alice"
fsName = "test-fs"
plName = "test-pl"
serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1
)
objects := newConfiguration(fsName, plName, userName, flowcontrol.LimitResponseTypeReject, plConcurrencyShares)
clientset := newClientset(t, objects...)
// this test does not rely on resync, so resync period is set to zero
factory := informers.NewSharedInformerFactory(clientset, 0)
controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestTimeout/4)
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
factory.Start(stopCh)
// wait for the informer cache to sync.
timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
cacheSyncDone := factory.WaitForCacheSync(timeout.Done())
if names := unsyncedInformers(cacheSyncDone); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}
var controllerErr error
go func() {
defer close(controllerCompletedCh)
controllerErr = controller.Run(stopCh)
}()
// make sure that apf controller syncs the priority level configuration object we are using in this test.
// read the metrics and ensure the concurrency limit for our priority level is set to the expected value.
pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
t.Logf("polling retry - error: %s", err)
return false, nil
}
return true, nil
})
if pollErr != nil {
t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", "test-pl")
}
var executed bool
// we will raise a panic for the first request.
firstRequestPathPanic := "/request/panic"
requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
executed = true
expectMatchingAPFHeaders(t, w, fsName, plName)
if r.URL.Path == firstRequestPathPanic {
panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI))
}
})
handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout)
server, requestGetter := newHTTP2ServerWithClient(handler)
defer server.Close()
var err error
_, err = requestGetter(firstRequestPathPanic)
if !executed {
t.Errorf("expected inner handler to be executed for request: %s", firstRequestPathPanic)
}
expectResetStreamError(t, err)
executed = false
// the second request should be served successfully.
secondRequestPathShouldWork := "/request/should-work"
response, err := requestGetter(secondRequestPathShouldWork)
if !executed {
t.Errorf("expected inner handler to be executed for request: %s", secondRequestPathShouldWork)
}
if err != nil {
t.Errorf("expected request: %s to succeed, but got error: %#v", secondRequestPathShouldWork, err)
}
if response.StatusCode != http.StatusOK {
t.Errorf("expected HTTP status code: %d for request: %s, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response)
}
close(stopCh)
t.Log("waiting for the controller to shutdown")
<-controllerCompletedCh
if controllerErr != nil {
t.Errorf("expected a nil error from controller, but got: %#v", controllerErr)
}
})
}
// returns a started http2 server, with a client function to send request to the server.
func newHTTP2ServerWithClient(handler http.Handler) (*httptest.Server, func(path string) (*http.Response, error)) {
server := httptest.NewUnstartedServer(handler)
server.EnableHTTP2 = true
server.StartTLS()
return server, func(path string) (*http.Response, error) {
return server.Client().Get(server.URL + path)
}
}
// verifies that the expected flow schema and priority level UIDs are attached to the header.
func expectMatchingAPFHeaders(t *testing.T, w http.ResponseWriter, expectedFS, expectedPL string) {
if w == nil {
t.Fatal("expected a non nil HTTP response")
}
key := flowcontrol.ResponseHeaderMatchedFlowSchemaUID
if value := w.Header().Get(key); expectedFS != value {
t.Fatalf("expected HTTP header %s to have value %q, but got: %q", key, expectedFS, value)
}
key = flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID
if value := w.Header().Get(key); expectedPL != value {
t.Fatalf("expected HTTP header %s to have value %q, but got %q", key, expectedPL, value)
}
}
// when a request panics, http2 resets the stream with an INTERNAL_ERROR message
func expectResetStreamError(t *testing.T, err error) {
if err == nil {
t.Fatalf("expected the server to send an error, but got nil")
}
uerr, ok := err.(*url.Error)
if !ok {
t.Fatalf("expected the error to be of type *url.Error, but got: %T", err)
}
if !strings.Contains(uerr.Error(), "INTERNAL_ERROR") {
t.Fatalf("expected a stream reset error, but got: %s", uerr.Error())
}
}
func newClientset(t *testing.T, objects ...runtime.Object) clientset.Interface {
clientset := fake.NewSimpleClientset(objects...)
if clientset == nil {
t.Fatal("unable to create fake client set")
}
return clientset
}
// builds a chain of handlers that include the panic recovery and timeout filter, so we can simulate the behavior of
// a real apiserver.
// the specified user is added as the authenticated user to the request context.
func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.Interface, userName string, requestTimeout time.Duration) http.Handler {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter)
// add the handler in the chain that adds the specified user to the request context
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
Name: userName,
Groups: []string{user.AllAuthenticated},
}))
apfHandler.ServeHTTP(w, r)
})
handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck, requestTimeout)
handler = apifilters.WithRequestInfo(handler, requestInfoFactory)
handler = WithPanicRecovery(handler, requestInfoFactory)
return handler
}
func unsyncedInformers(status map[reflect.Type]bool) []string {
names := make([]string, 0)
for objType, synced := range status {
if !synced {
names = append(names, objType.Name())
}
}
return names
}
func newConfiguration(fsName, plName, user string, responseType flowcontrol.LimitResponseType, concurrency int32) []runtime.Object {
fs := &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: fsName,
UID: types.UID(fsName),
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 1,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
Rules: []flowcontrol.PolicyRulesWithSubjects{
{
Subjects: []flowcontrol.Subject{
{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{
Name: user,
},
},
},
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
{
Verbs: []string{flowcontrol.VerbAll},
NonResourceURLs: []string{flowcontrol.NonResourceAll},
},
},
},
},
},
}
pl := &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
UID: types.UID(plName),
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: concurrency,
LimitResponse: flowcontrol.LimitResponse{
Type: responseType,
},
},
},
}
return []runtime.Object{fs, pl}
}
// gathers and checks the metrics.
func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) {
metricsFamily, err := legacyregistry.DefaultGatherer.Gather()
@ -353,3 +601,40 @@ func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) {
}
}
}
// gaugeValueMatch ensures that the value of gauge metrics matching the labelFilter is as expected.
func gaugeValueMatch(name string, labelFilter map[string]string, wantValue int) error {
metrics, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %s", err)
}
sum := 0
familyMatch, labelMatch := false, false
for _, mf := range metrics {
if mf.GetName() != name {
continue
}
familyMatch = true
for _, metric := range mf.GetMetric() {
if !testutil.LabelsMatch(metric, labelFilter) {
continue
}
labelMatch = true
sum += int(metric.GetGauge().GetValue())
}
}
if !familyMatch {
return fmt.Errorf("expected to find the metric family: %s in the gathered result", name)
}
if !labelMatch {
return fmt.Errorf("expected to find metrics with matching labels: %#+v", labelFilter)
}
if wantValue != sum {
return fmt.Errorf("expected the sum to be: %d, but got: %d for gauge metric: %s with labels %#+v", wantValue, sum, name, labelFilter)
}
return nil
}

View File

@ -130,21 +130,28 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued)
var executed bool
idle := req.Finish(func() {
idle, panicking := true, true
defer func() {
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => panicking=%v idle=%v",
requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, panicking, idle)
if idle {
cfgCtlr.maybeReap(pl.Name)
}
}()
idle = req.Finish(func() {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
metrics.AddDispatch(pl.Name, fs.Name)
executed = true
startExecutionTime := time.Now()
defer func() {
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
}()
execFn()
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
})
if queued && !executed {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle)
if idle {
cfgCtlr.maybeReap(pl.Name)
}
panicking = false
}

View File

@ -316,8 +316,15 @@ func (req *request) Finish(execFn func()) bool {
if !exec {
return idle
}
execFn()
return req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
func() {
defer func() {
idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
}()
execFn()
}()
return idle
}
func (req *request) wait() (bool, bool) {

View File

@ -18,6 +18,7 @@ package queueset
import (
"context"
"errors"
"fmt"
"math"
"reflect"
@ -714,6 +715,67 @@ func TestContextCancel(t *testing.T) {
}
}
func TestTotalRequestsExecutingWithPanic(t *testing.T) {
metrics.Register()
metrics.Reset()
now := time.Now()
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
qCfg := fq.QueuingConfig{
Name: "TestTotalRequestsExecutingWithPanic",
DesiredNumQueues: 0,
RequestWaitLimit: 15 * time.Second,
}
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
if err != nil {
t.Fatal(err)
}
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
counter.Add(1) // account for the goroutine running this test
queue, ok := qs.(*queueSet)
if !ok {
t.Fatalf("expected a QueueSet of type: %T but got: %T", &queueSet{}, qs)
}
if queue.totRequestsExecuting != 0 {
t.Fatalf("precondition: expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting)
}
if queue.dCfg.ConcurrencyLimit != 1 {
t.Fatalf("precondition: expected concurrency limit of the QueueSet to be 1, but got: %d", queue.dCfg.ConcurrencyLimit)
}
ctx := context.Background()
req, _ := qs.StartRequest(ctx, 1, "", "fs", "test", "one", func(inQueue bool) {})
if req == nil {
t.Fatal("expected a Request object from StartRequest, but got nil")
}
panicErrExpected := errors.New("apiserver panic'd")
var panicErrGot interface{}
func() {
defer func() {
panicErrGot = recover()
}()
req.Finish(func() {
// verify that total requests executing goes up by 1 since the request is executing.
if queue.totRequestsExecuting != 1 {
t.Fatalf("expected total requests currently executing of the QueueSet to be 1, but got: %d", queue.totRequestsExecuting)
}
panic(panicErrExpected)
})
}()
// verify that the panic was from us (above)
if panicErrExpected != panicErrGot {
t.Errorf("expected panic error: %#v, but got: %#v", panicErrExpected, panicErrGot)
}
if queue.totRequestsExecuting != 0 {
t.Errorf("expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting)
}
}
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
}