diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 831549636c2..5d5cefaf63a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -21,10 +21,13 @@ go_test( "//staging/src/k8s.io/api/flowcontrol/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", @@ -34,7 +37,11 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//vendor/golang.org/x/net/http2:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index f09a56ce178..8cd8c867a38 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -21,12 +21,19 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" + "reflect" + "strings" "sync" "testing" "time" flowcontrol "k8s.io/api/flowcontrol/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" apifilters "k8s.io/apiserver/pkg/endpoints/filters" @@ -36,7 +43,11 @@ import ( utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" ) type mockDecision int @@ -333,6 +344,243 @@ func TestApfCancelWaitRequest(t *testing.T) { }) } +func TestPriorityAndFairnessWithPanicRecoverAndTimeoutFilter(t *testing.T) { + fcmetrics.Register() + + t.Run("priority level concurrency is set to 1, request handler panics, next request should not be rejected", func(t *testing.T) { + const ( + requestTimeout = time.Minute + userName = "alice" + fsName = "test-fs" + plName = "test-pl" + serverConcurrency, plConcurrencyShares, plConcurrency = 1, 1, 1 + ) + + objects := newConfiguration(fsName, plName, userName, flowcontrol.LimitResponseTypeReject, plConcurrencyShares) + clientset := newClientset(t, objects...) + // this test does not rely on resync, so resync period is set to zero + factory := informers.NewSharedInformerFactory(clientset, 0) + controller := utilflowcontrol.New(factory, clientset.FlowcontrolV1beta1(), serverConcurrency, requestTimeout/4) + + stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) + factory.Start(stopCh) + + // wait for the informer cache to sync. + timeout, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + cacheSyncDone := factory.WaitForCacheSync(timeout.Done()) + if names := unsyncedInformers(cacheSyncDone); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + var controllerErr error + go func() { + defer close(controllerCompletedCh) + controllerErr = controller.Run(stopCh) + }() + + // make sure that apf controller syncs the priority level configuration object we are using in this test. + // read the metrics and ensure the concurrency limit for our priority level is set to the expected value. + pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil { + t.Logf("polling retry - error: %s", err) + return false, nil + } + return true, nil + }) + if pollErr != nil { + t.Fatalf("expected the apf controller to sync the priotity level configuration object: %s", "test-pl") + } + + var executed bool + // we will raise a panic for the first request. + firstRequestPathPanic := "/request/panic" + requestHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + executed = true + expectMatchingAPFHeaders(t, w, fsName, plName) + + if r.URL.Path == firstRequestPathPanic { + panic(fmt.Errorf("request handler panic'd as designed - %#v", r.RequestURI)) + } + }) + handler := newHandlerChain(t, requestHandler, controller, userName, requestTimeout) + + server, requestGetter := newHTTP2ServerWithClient(handler) + defer server.Close() + + var err error + _, err = requestGetter(firstRequestPathPanic) + if !executed { + t.Errorf("expected inner handler to be executed for request: %s", firstRequestPathPanic) + } + expectResetStreamError(t, err) + + executed = false + // the second request should be served successfully. + secondRequestPathShouldWork := "/request/should-work" + response, err := requestGetter(secondRequestPathShouldWork) + if !executed { + t.Errorf("expected inner handler to be executed for request: %s", secondRequestPathShouldWork) + } + if err != nil { + t.Errorf("expected request: %s to succeed, but got error: %#v", secondRequestPathShouldWork, err) + } + if response.StatusCode != http.StatusOK { + t.Errorf("expected HTTP status code: %d for request: %s, but got: %#v", http.StatusOK, secondRequestPathShouldWork, response) + } + + close(stopCh) + t.Log("waiting for the controller to shutdown") + <-controllerCompletedCh + + if controllerErr != nil { + t.Errorf("expected a nil error from controller, but got: %#v", controllerErr) + } + }) +} + +// returns a started http2 server, with a client function to send request to the server. +func newHTTP2ServerWithClient(handler http.Handler) (*httptest.Server, func(path string) (*http.Response, error)) { + server := httptest.NewUnstartedServer(handler) + server.EnableHTTP2 = true + server.StartTLS() + + return server, func(path string) (*http.Response, error) { + return server.Client().Get(server.URL + path) + } +} + +// verifies that the expected flow schema and priority level UIDs are attached to the header. +func expectMatchingAPFHeaders(t *testing.T, w http.ResponseWriter, expectedFS, expectedPL string) { + if w == nil { + t.Fatal("expected a non nil HTTP response") + } + + key := flowcontrol.ResponseHeaderMatchedFlowSchemaUID + if value := w.Header().Get(key); expectedFS != value { + t.Fatalf("expected HTTP header %s to have value %q, but got: %q", key, expectedFS, value) + } + + key = flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID + if value := w.Header().Get(key); expectedPL != value { + t.Fatalf("expected HTTP header %s to have value %q, but got %q", key, expectedPL, value) + } +} + +// when a request panics, http2 resets the stream with an INTERNAL_ERROR message +func expectResetStreamError(t *testing.T, err error) { + if err == nil { + t.Fatalf("expected the server to send an error, but got nil") + } + + uerr, ok := err.(*url.Error) + if !ok { + t.Fatalf("expected the error to be of type *url.Error, but got: %T", err) + } + if !strings.Contains(uerr.Error(), "INTERNAL_ERROR") { + t.Fatalf("expected a stream reset error, but got: %s", uerr.Error()) + } +} + +func newClientset(t *testing.T, objects ...runtime.Object) clientset.Interface { + clientset := fake.NewSimpleClientset(objects...) + if clientset == nil { + t.Fatal("unable to create fake client set") + } + return clientset +} + +// builds a chain of handlers that include the panic recovery and timeout filter, so we can simulate the behavior of +// a real apiserver. +// the specified user is added as the authenticated user to the request context. +func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.Interface, userName string, requestTimeout time.Duration) http.Handler { + requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} + longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) + + apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter) + + // add the handler in the chain that adds the specified user to the request context + handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ + Name: userName, + Groups: []string{user.AllAuthenticated}, + })) + + apfHandler.ServeHTTP(w, r) + }) + + handler = WithTimeoutForNonLongRunningRequests(handler, longRunningRequestCheck, requestTimeout) + handler = apifilters.WithRequestInfo(handler, requestInfoFactory) + handler = WithPanicRecovery(handler, requestInfoFactory) + return handler +} + +func unsyncedInformers(status map[reflect.Type]bool) []string { + names := make([]string, 0) + + for objType, synced := range status { + if !synced { + names = append(names, objType.Name()) + } + } + + return names +} + +func newConfiguration(fsName, plName, user string, responseType flowcontrol.LimitResponseType, concurrency int32) []runtime.Object { + fs := &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: fsName, + UID: types.UID(fsName), + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: 1, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: plName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrol.PolicyRulesWithSubjects{ + { + Subjects: []flowcontrol.Subject{ + { + Kind: flowcontrol.SubjectKindUser, + User: &flowcontrol.UserSubject{ + Name: user, + }, + }, + }, + NonResourceRules: []flowcontrol.NonResourcePolicyRule{ + { + Verbs: []string{flowcontrol.VerbAll}, + NonResourceURLs: []string{flowcontrol.NonResourceAll}, + }, + }, + }, + }, + }, + } + + pl := &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: plName, + UID: types.UID(plName), + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: concurrency, + LimitResponse: flowcontrol.LimitResponse{ + Type: responseType, + }, + }, + }, + } + + return []runtime.Object{fs, pl} +} + // gathers and checks the metrics. func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) { metricsFamily, err := legacyregistry.DefaultGatherer.Gather() @@ -353,3 +601,40 @@ func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) { } } } + +// gaugeValueMatch ensures that the value of gauge metrics matching the labelFilter is as expected. +func gaugeValueMatch(name string, labelFilter map[string]string, wantValue int) error { + metrics, err := legacyregistry.DefaultGatherer.Gather() + if err != nil { + return fmt.Errorf("failed to gather metrics: %s", err) + } + + sum := 0 + familyMatch, labelMatch := false, false + for _, mf := range metrics { + if mf.GetName() != name { + continue + } + + familyMatch = true + for _, metric := range mf.GetMetric() { + if !testutil.LabelsMatch(metric, labelFilter) { + continue + } + + labelMatch = true + sum += int(metric.GetGauge().GetValue()) + } + } + if !familyMatch { + return fmt.Errorf("expected to find the metric family: %s in the gathered result", name) + } + if !labelMatch { + return fmt.Errorf("expected to find metrics with matching labels: %#+v", labelFilter) + } + if wantValue != sum { + return fmt.Errorf("expected the sum to be: %d, but got: %d for gauge metric: %s with labels %#+v", wantValue, sum, name, labelFilter) + } + + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index b95c790d073..a6afb2b24ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -130,21 +130,28 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued) var executed bool - idle := req.Finish(func() { + idle, panicking := true, true + defer func() { + klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => panicking=%v idle=%v", + requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, panicking, idle) + if idle { + cfgCtlr.maybeReap(pl.Name) + } + }() + idle = req.Finish(func() { if queued { metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } metrics.AddDispatch(pl.Name, fs.Name) executed = true startExecutionTime := time.Now() + defer func() { + metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime)) + }() execFn() - metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime)) }) if queued && !executed { metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } - klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle) - if idle { - cfgCtlr.maybeReap(pl.Name) - } + panicking = false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 2f84a3f9e67..acdec4bba1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -316,8 +316,15 @@ func (req *request) Finish(execFn func()) bool { if !exec { return idle } - execFn() - return req.qs.finishRequestAndDispatchAsMuchAsPossible(req) + func() { + defer func() { + idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req) + }() + + execFn() + }() + + return idle } func (req *request) wait() (bool, bool) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 57b42894b5b..406e820b4bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -18,6 +18,7 @@ package queueset import ( "context" + "errors" "fmt" "math" "reflect" @@ -714,6 +715,67 @@ func TestContextCancel(t *testing.T) { } } +func TestTotalRequestsExecutingWithPanic(t *testing.T) { + metrics.Register() + metrics.Reset() + now := time.Now() + clk, counter := testclock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + qCfg := fq.QueuingConfig{ + Name: "TestTotalRequestsExecutingWithPanic", + DesiredNumQueues: 0, + RequestWaitLimit: 15 * time.Second, + } + qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) + if err != nil { + t.Fatal(err) + } + qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) + counter.Add(1) // account for the goroutine running this test + + queue, ok := qs.(*queueSet) + if !ok { + t.Fatalf("expected a QueueSet of type: %T but got: %T", &queueSet{}, qs) + } + if queue.totRequestsExecuting != 0 { + t.Fatalf("precondition: expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting) + } + if queue.dCfg.ConcurrencyLimit != 1 { + t.Fatalf("precondition: expected concurrency limit of the QueueSet to be 1, but got: %d", queue.dCfg.ConcurrencyLimit) + } + + ctx := context.Background() + req, _ := qs.StartRequest(ctx, 1, "", "fs", "test", "one", func(inQueue bool) {}) + if req == nil { + t.Fatal("expected a Request object from StartRequest, but got nil") + } + + panicErrExpected := errors.New("apiserver panic'd") + var panicErrGot interface{} + func() { + defer func() { + panicErrGot = recover() + }() + + req.Finish(func() { + // verify that total requests executing goes up by 1 since the request is executing. + if queue.totRequestsExecuting != 1 { + t.Fatalf("expected total requests currently executing of the QueueSet to be 1, but got: %d", queue.totRequestsExecuting) + } + + panic(panicErrExpected) + }) + }() + + // verify that the panic was from us (above) + if panicErrExpected != panicErrGot { + t.Errorf("expected panic error: %#v, but got: %#v", panicErrExpected, panicErrGot) + } + if queue.totRequestsExecuting != 0 { + t.Errorf("expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting) + } +} + func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) }