From 0cc217647ca8be0820973b970124a072c27b6575 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 7 May 2021 12:49:06 +0200 Subject: [PATCH 1/3] Implement support for watch initialization in P&F --- .../pkg/server/filters/maxinflight.go | 5 +- .../server/filters/priority-and-fairness.go | 59 +++++-- .../filters/priority-and-fairness_test.go | 147 +++++++++++++++++- .../apiserver/pkg/storage/cacher/cacher.go | 9 ++ .../storage/cacher/cacher_whitebox_test.go | 21 +++ .../apiserver/pkg/storage/etcd3/watcher.go | 9 ++ .../pkg/storage/etcd3/watcher_test.go | 18 +++ .../pkg/util/flowcontrol/apf_context.go | 82 ++++++++++ 8 files changed, 326 insertions(+), 24 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 2484bfc76c8..70c8d8b855c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -47,7 +47,10 @@ const ( observationMaintenancePeriod = 10 * time.Second ) -var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") +var ( + nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") + watchVerbs = sets.NewString("watch") +) func handleError(w http.ResponseWriter, r *http.Request, err error) { errorMsg := fmt.Sprintf("Internal Server Error: %#v", r.RequestURI) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 186824e2f26..bc589299f3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -17,13 +17,14 @@ limitations under the License. package filters import ( - "context" "fmt" "net/http" + "sync" "sync/atomic" flowcontrol "k8s.io/api/flowcontrol/v1beta1" apitypes "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -31,10 +32,6 @@ import ( "k8s.io/klog/v2" ) -type priorityAndFairnessKeyType int - -const priorityAndFairnessKey priorityAndFairnessKeyType = iota - // PriorityAndFairnessClassification identifies the results of // classification for API Priority and Fairness type PriorityAndFairnessClassification struct { @@ -44,12 +41,6 @@ type PriorityAndFairnessClassification struct { PriorityLevelUID apitypes.UID } -// GetClassification returns the classification associated with the -// given context, if any, otherwise nil -func GetClassification(ctx context.Context) *PriorityAndFairnessClassification { - return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification) -} - // waitingMark tracks requests waiting rather than being executed var waitingMark = &requestWatermark{ phase: epmetrics.WaitingPhase, @@ -60,6 +51,9 @@ var waitingMark = &requestWatermark{ var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32 +// newInitializationSignal is defined for testing purposes. +var newInitializationSignal = utilflowcontrol.NewInitializationSignal + // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. func WithPriorityAndFairness( @@ -84,8 +78,10 @@ func WithPriorityAndFairness( return } - // Skip tracking long running requests. - if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) { + isWatchRequest := watchVerbs.Has(requestInfo.Verb) + + // Skip tracking long running non-watch requests. + if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest { klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user) handler.ServeHTTP(w, r) return @@ -116,15 +112,40 @@ func WithPriorityAndFairness( waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) } } + wg := sync.WaitGroup{} execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) served = true - innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification) - innerReq := r.Clone(innerCtx) + + innerCtx := ctx + innerReq := r + + var watchInitializationSignal utilflowcontrol.InitializationSignal + if isWatchRequest { + watchInitializationSignal = newInitializationSignal() + innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) + innerReq = r.Clone(innerCtx) + } setResponseHeaders(classification, w) - handler.ServeHTTP(w, innerReq) + if isWatchRequest { + wg.Add(1) + go func() { + defer utilruntime.HandleCrash() + + defer wg.Done() + // Protect from the situations when request will not reach storage layer + // and the initialization signal will not be send. + defer watchInitializationSignal.Signal() + + handler.ServeHTTP(w, innerReq) + }() + + watchInitializationSignal.Wait() + } else { + handler.ServeHTTP(w, innerReq) + } } digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user} fcIfc.Handle(ctx, digest, note, func(inQueue bool) { @@ -143,9 +164,13 @@ func WithPriorityAndFairness( epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) + if isWatchRequest { + wg.Done() + } tooManyRequests(r, w) } - + // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. + wg.Wait() }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 0ef6345f34b..1e9a8537b16 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -138,16 +138,21 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes } func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { + fakeFilter := fakeApfFilter{ + mockDecision: decision, + postEnqueue: postEnqueue, + postDequeue: postDequeue, + } + return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t) +} + +func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, fakeApfFilter{ - mockDecision: decision, - postEnqueue: postEnqueue, - postDequeue: postDequeue, - }) + }), longRunningRequestCheck, flowControlFilter) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -175,7 +180,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) // send a watch request to test skipping long running request - if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/foos/foo/proxy", server.URL), http.StatusOK); err != nil { // request should not be rejected t.Error(err) } @@ -334,6 +339,136 @@ func TestApfExecuteMultipleRequests(t *testing.T) { }) } +type fakeWatchApfFilter struct { + lock sync.Mutex + inflight int + capacity int +} + +func (f *fakeWatchApfFilter) Handle(ctx context.Context, + requestDigest utilflowcontrol.RequestDigest, + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + _ fq.QueueNoteFn, + execFn func(), +) { + canExecute := false + func() { + f.lock.Lock() + defer f.lock.Unlock() + if f.inflight < f.capacity { + f.inflight++ + canExecute = true + } + }() + if !canExecute { + return + } + + execFn() + + f.lock.Lock() + defer f.lock.Unlock() + f.inflight-- +} + +func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) { +} + +func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { + return nil +} + +func (t *fakeWatchApfFilter) Install(c *mux.PathRecorderMux) { +} + +func (f *fakeWatchApfFilter) wait() error { + return wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + f.lock.Lock() + defer f.lock.Unlock() + return f.inflight == 0, nil + }) +} + +func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { + signalsLock := sync.Mutex{} + signals := []utilflowcontrol.InitializationSignal{} + sendSignals := func() { + signalsLock.Lock() + defer signalsLock.Unlock() + for i := range signals { + signals[i].Signal() + } + signals = signals[:0] + } + + newInitializationSignal = func() utilflowcontrol.InitializationSignal { + signalsLock.Lock() + defer signalsLock.Unlock() + signal := utilflowcontrol.NewInitializationSignal() + signals = append(signals, signal) + return signal + } + defer func() { + newInitializationSignal = utilflowcontrol.NewInitializationSignal + }() + + // We test if initialization after receiving initialization signal the + // new requests will be allowed to run by: + // - sending N requests that will occupy the whole capacity + // - sending initialiation signals for them + // - ensuring that number of inflight requests will get to zero + concurrentRequests := 5 + firstRunning := sync.WaitGroup{} + firstRunning.Add(concurrentRequests) + allRunning := sync.WaitGroup{} + allRunning.Add(2 * concurrentRequests) + + fakeFilter := &fakeWatchApfFilter{ + capacity: concurrentRequests, + } + + onExecuteFunc := func() { + firstRunning.Done() + firstRunning.Wait() + + sendSignals() + fakeFilter.wait() + + allRunning.Done() + allRunning.Wait() + } + + postExecuteFunc := func() {} + + server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + defer server.Close() + + var wg sync.WaitGroup + wg.Add(2 * concurrentRequests) + for i := 0; i < concurrentRequests; i++ { + go func() { + defer wg.Done() + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + }() + } + + firstRunning.Wait() + fakeFilter.wait() + + firstRunning.Add(concurrentRequests) + for i := 0; i < concurrentRequests; i++ { + go func() { + defer wg.Done() + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + }() + } + wg.Wait() +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 23ffc3ae31e..39e8db0b767 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" @@ -1413,6 +1414,14 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) } + // At this point we already start processing incoming watch events. + // However, the init event can still be processed because their serialization + // and sending to the client happens asynchrnously. + // TODO: As describe in the KEP, we would like to estimate that by delaying + // the initialization signal proportionally to the number of events to + // process, but we're leaving this to the tuning phase. + utilflowcontrol.WatchInitialized(ctx) + defer close(c.result) defer c.Stop() for { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f3a6c08634b..69fc6e38e3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) var ( @@ -701,6 +702,26 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { } } +func TestWatchInitializationSignal(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + initSignal := utilflowcontrol.NewInitializationSignal() + ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) + + _, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + initSignal.Wait() +} + func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index bd87382e83f..0a6f4bc3c15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -32,6 +32,7 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "go.etcd.io/etcd/clientv3" "k8s.io/klog/v2" @@ -120,6 +121,14 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p } wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred) go wc.run() + + // For etcd watch we don't have an easy way to answer whether the watch + // has already caught up. So in the initial version (given that watchcache + // is by default enabled for all resources but Events), we just deliver + // the initialization signal immediately. Improving this will be explored + // in the future. + utilflowcontrol.WatchInitialized(ctx) + return wc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index a141aaafc38..89a5e7b3f82 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) func TestWatch(t *testing.T) { @@ -313,6 +314,23 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { } } +func TestWatchInitializationSignal(t *testing.T) { + _, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + initSignal := utilflowcontrol.NewInitializationSignal() + ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) + + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + _, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + initSignal.Wait() +} + func TestProgressNotify(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) clusterConfig := &integration.ClusterConfig{ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go new file mode 100644 index 00000000000..6497e3fff5e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go @@ -0,0 +1,82 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "sync" +) + +type priorityAndFairnessKeyType int + +const ( + // priorityAndFairnessInitializationSignalKey is a key under which + // initialization signal function for watch requests is stored + // in the context. + priorityAndFairnessInitializationSignalKey priorityAndFairnessKeyType = iota +) + +// WithInitializationSignal creates a copy of parent context with +// priority and fairness initialization signal value. +func WithInitializationSignal(ctx context.Context, signal InitializationSignal) context.Context { + return context.WithValue(ctx, priorityAndFairnessInitializationSignalKey, signal) +} + +// initializationSignalFrom returns an initialization signal function +// which when called signals that watch initialization has already finished +// to priority and fairness dispatcher. +func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool) { + signal, ok := ctx.Value(priorityAndFairnessInitializationSignalKey).(InitializationSignal) + return signal, ok && signal != nil +} + +// WatchInitialized sends a signal to priority and fairness dispatcher +// that a given watch request has already been initialized. +func WatchInitialized(ctx context.Context) { + if signal, ok := initializationSignalFrom(ctx); ok { + signal.Signal() + } +} + +// InitializationSignal is an interface that allows sending and handling +// initialization signals. +type InitializationSignal interface { + // Signal notifies the dispatcher about finished initialization. + Signal() + // Wait waits for the initialization signal. + Wait() +} + +type initializationSignal struct { + once sync.Once + done chan struct{} +} + +func NewInitializationSignal() InitializationSignal { + return &initializationSignal{ + once: sync.Once{}, + done: make(chan struct{}), + } +} + +func (i *initializationSignal) Signal() { + i.once.Do(func() { close(i.done) }) +} + +func (i *initializationSignal) Wait() { + <-i.done +} From d9d51541a87ec627160d7d6a1fcd4b357a0fa493 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 27 May 2021 14:49:54 +0200 Subject: [PATCH 2/3] Address watch panics in P&F handler and extend testing. --- .../server/filters/priority-and-fairness.go | 31 +++++++--- .../filters/priority-and-fairness_test.go | 60 ++++++++++++++++++- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index bc589299f3f..13658377c67 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -19,12 +19,11 @@ package filters import ( "fmt" "net/http" - "sync" + "runtime" "sync/atomic" flowcontrol "k8s.io/api/flowcontrol/v1beta1" apitypes "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -112,7 +111,7 @@ func WithPriorityAndFairness( waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) } } - wg := sync.WaitGroup{} + var resultCh chan interface{} execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -130,11 +129,22 @@ func WithPriorityAndFairness( setResponseHeaders(classification, w) if isWatchRequest { - wg.Add(1) + resultCh = make(chan interface{}) go func() { - defer utilruntime.HandleCrash() + defer func() { + err := recover() + // do not wrap the sentinel ErrAbortHandler panic value + if err != nil && err != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + err = fmt.Sprintf("%v\n%s", err, buf) + } + resultCh <- err + }() - defer wg.Done() // Protect from the situations when request will not reach storage layer // and the initialization signal will not be send. defer watchInitializationSignal.Signal() @@ -165,12 +175,17 @@ func WithPriorityAndFairness( } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) if isWatchRequest { - wg.Done() + close(resultCh) } tooManyRequests(r, w) } // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. - wg.Wait() + if isWatchRequest { + err := <-resultCh + if err != nil { + panic(err) + } + } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 1e9a8537b16..376ab7ed389 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -147,6 +147,11 @@ func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEn } func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { + apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t)) + return apfServer +} + +func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) @@ -165,8 +170,7 @@ func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecu } }), requestInfoFactory) - apfServer := httptest.NewServer(handler) - return apfServer + return handler } func TestApfSkipLongRunningRequest(t *testing.T) { @@ -469,6 +473,58 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { wg.Wait() } +func TestApfWatchPanic(t *testing.T) { + fakeFilter := &fakeWatchApfFilter{ + capacity: 1, + } + + onExecuteFunc := func() { + panic("test panic") + } + postExecuteFunc := func() {} + + apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + handler := func(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := recover(); err == nil { + t.Errorf("expected panic, got %v", err) + } + }() + apfHandler.ServeHTTP(w, r) + } + server := httptest.NewServer(http.HandlerFunc(handler)) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +// TestContextClosesOnRequestProcessed ensures that the request context is cancelled +// automatically even if the server doesn't cancel is explicitly. +// This is required to ensure we won't be leaking goroutines that wait for context +// cancelling (e.g. in queueset::StartRequest method). +func TestContextClosesOnRequestProcessed(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + handler := func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // asynchronously wait for context being closed + go func() { + <-ctx.Done() + wg.Done() + }() + } + server := httptest.NewServer(http.HandlerFunc(handler)) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Errorf("unexpected error: %v", err) + } + + wg.Wait() +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() From 8054b0f808d116658ac086e4b71fb34d1502cd57 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 2 Jun 2021 08:22:29 +0200 Subject: [PATCH 3/3] Fix watch rejections in P&F filter --- .../server/filters/priority-and-fairness.go | 11 +++- .../filters/priority-and-fairness_test.go | 50 +++++++++++++------ 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 13658377c67..23ea5b7287a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -112,6 +112,9 @@ func WithPriorityAndFairness( } } var resultCh chan interface{} + if isWatchRequest { + resultCh = make(chan interface{}) + } execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) @@ -129,7 +132,6 @@ func WithPriorityAndFairness( setResponseHeaders(classification, w) if isWatchRequest { - resultCh = make(chan interface{}) go func() { defer func() { err := recover() @@ -179,7 +181,12 @@ func WithPriorityAndFairness( } tooManyRequests(r, w) } - // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. + + // For watch requests, from the APF point of view the request is already + // finished at this point. However, that doesn't mean it is already finished + // from the non-APF point of view. So we need to wait here until the request is: + // 1) finished being processed or + // 2) rejected if isWatchRequest { err := <-resultCh if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 376ab7ed389..80ccaa35bf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -112,7 +112,7 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error { func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { } -func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server { +func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptest.Server { onExecuteFunc := func() { if decision == decisionCancelWait { t.Errorf("execute should not be invoked") @@ -134,24 +134,24 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) } } - return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) } -func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { +func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server { fakeFilter := fakeApfFilter{ mockDecision: decision, postEnqueue: postEnqueue, postDequeue: postDequeue, } - return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t) + return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) } -func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { - apfServer := httptest.NewServer(newApfHandlerWithFilter(flowControlFilter, onExecute, postExecute, t)) +func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server { + apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute)) return apfServer } -func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) http.Handler { +func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) @@ -176,7 +176,7 @@ func newApfHandlerWithFilter(flowControlFilter utilflowcontrol.Interface, onExec func TestApfSkipLongRunningRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionSkipFilter, t) + server := newApfServerWithSingleRequest(t, decisionSkipFilter) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -193,7 +193,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { func TestApfRejectRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionReject, t) + server := newApfServerWithSingleRequest(t, decisionReject) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -218,7 +218,7 @@ func TestApfExemptRequest(t *testing.T) { // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) + server := newApfServerWithSingleRequest(t, decisionNoQueuingExecute) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -244,7 +244,7 @@ func TestApfExecuteRequest(t *testing.T) { // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServerWithSingleRequest(decisionQueuingExecute, t) + server := newApfServerWithSingleRequest(t, decisionQueuingExecute) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -316,7 +316,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) { finishExecute.Wait() } - server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + server := newApfServerWithHooks(t, decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc) defer server.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -444,7 +444,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { postExecuteFunc := func() {} - server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) defer server.Close() var wg sync.WaitGroup @@ -473,6 +473,24 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { wg.Wait() } +func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { + fakeFilter := &fakeWatchApfFilter{ + capacity: 0, + } + + onExecuteFunc := func() { + t.Errorf("Request unexepectedly executing") + } + postExecuteFunc := func() {} + + server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil { + t.Error(err) + } +} + func TestApfWatchPanic(t *testing.T) { fakeFilter := &fakeWatchApfFilter{ capacity: 1, @@ -483,7 +501,7 @@ func TestApfWatchPanic(t *testing.T) { } postExecuteFunc := func() {} - apfHandler := newApfHandlerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc) handler := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == nil { @@ -504,6 +522,8 @@ func TestApfWatchPanic(t *testing.T) { // automatically even if the server doesn't cancel is explicitly. // This is required to ensure we won't be leaking goroutines that wait for context // cancelling (e.g. in queueset::StartRequest method). +// Even though in production we are not using httptest.Server, this logic is shared +// across these two. func TestContextClosesOnRequestProcessed(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) @@ -528,7 +548,7 @@ func TestContextClosesOnRequestProcessed(t *testing.T) { func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() - server := newApfServerWithSingleRequest(decisionCancelWait, t) + server := newApfServerWithSingleRequest(t, decisionCancelWait) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {