diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 2484bfc76c8..70c8d8b855c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -47,7 +47,10 @@ const ( observationMaintenancePeriod = 10 * time.Second ) -var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") +var ( + nonMutatingRequestVerbs = sets.NewString("get", "list", "watch") + watchVerbs = sets.NewString("watch") +) func handleError(w http.ResponseWriter, r *http.Request, err error) { errorMsg := fmt.Sprintf("Internal Server Error: %#v", r.RequestURI) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 186824e2f26..bc589299f3f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -17,13 +17,14 @@ limitations under the License. package filters import ( - "context" "fmt" "net/http" + "sync" "sync/atomic" flowcontrol "k8s.io/api/flowcontrol/v1beta1" apitypes "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -31,10 +32,6 @@ import ( "k8s.io/klog/v2" ) -type priorityAndFairnessKeyType int - -const priorityAndFairnessKey priorityAndFairnessKeyType = iota - // PriorityAndFairnessClassification identifies the results of // classification for API Priority and Fairness type PriorityAndFairnessClassification struct { @@ -44,12 +41,6 @@ type PriorityAndFairnessClassification struct { PriorityLevelUID apitypes.UID } -// GetClassification returns the classification associated with the -// given context, if any, otherwise nil -func GetClassification(ctx context.Context) *PriorityAndFairnessClassification { - return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification) -} - // waitingMark tracks requests waiting rather than being executed var waitingMark = &requestWatermark{ phase: epmetrics.WaitingPhase, @@ -60,6 +51,9 @@ var waitingMark = &requestWatermark{ var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32 +// newInitializationSignal is defined for testing purposes. +var newInitializationSignal = utilflowcontrol.NewInitializationSignal + // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. func WithPriorityAndFairness( @@ -84,8 +78,10 @@ func WithPriorityAndFairness( return } - // Skip tracking long running requests. - if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) { + isWatchRequest := watchVerbs.Has(requestInfo.Verb) + + // Skip tracking long running non-watch requests. + if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest { klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user) handler.ServeHTTP(w, r) return @@ -116,15 +112,40 @@ func WithPriorityAndFairness( waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) } } + wg := sync.WaitGroup{} execute := func() { noteExecutingDelta(1) defer noteExecutingDelta(-1) served = true - innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification) - innerReq := r.Clone(innerCtx) + + innerCtx := ctx + innerReq := r + + var watchInitializationSignal utilflowcontrol.InitializationSignal + if isWatchRequest { + watchInitializationSignal = newInitializationSignal() + innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal) + innerReq = r.Clone(innerCtx) + } setResponseHeaders(classification, w) - handler.ServeHTTP(w, innerReq) + if isWatchRequest { + wg.Add(1) + go func() { + defer utilruntime.HandleCrash() + + defer wg.Done() + // Protect from the situations when request will not reach storage layer + // and the initialization signal will not be send. + defer watchInitializationSignal.Signal() + + handler.ServeHTTP(w, innerReq) + }() + + watchInitializationSignal.Wait() + } else { + handler.ServeHTTP(w, innerReq) + } } digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user} fcIfc.Handle(ctx, digest, note, func(inQueue bool) { @@ -143,9 +164,13 @@ func WithPriorityAndFairness( epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) + if isWatchRequest { + wg.Done() + } tooManyRequests(r, w) } - + // In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes. + wg.Wait() }) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 0ef6345f34b..1e9a8537b16 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -138,16 +138,21 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes } func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { + fakeFilter := fakeApfFilter{ + mockDecision: decision, + postEnqueue: postEnqueue, + postDequeue: postDequeue, + } + return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t) +} + +func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server { requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, fakeApfFilter{ - mockDecision: decision, - postEnqueue: postEnqueue, - postDequeue: postDequeue, - }) + }), longRunningRequestCheck, flowControlFilter) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -175,7 +180,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { StartPriorityAndFairnessWatermarkMaintenance(ctx.Done()) // send a watch request to test skipping long running request - if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/foos/foo/proxy", server.URL), http.StatusOK); err != nil { // request should not be rejected t.Error(err) } @@ -334,6 +339,136 @@ func TestApfExecuteMultipleRequests(t *testing.T) { }) } +type fakeWatchApfFilter struct { + lock sync.Mutex + inflight int + capacity int +} + +func (f *fakeWatchApfFilter) Handle(ctx context.Context, + requestDigest utilflowcontrol.RequestDigest, + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + _ fq.QueueNoteFn, + execFn func(), +) { + canExecute := false + func() { + f.lock.Lock() + defer f.lock.Unlock() + if f.inflight < f.capacity { + f.inflight++ + canExecute = true + } + }() + if !canExecute { + return + } + + execFn() + + f.lock.Lock() + defer f.lock.Unlock() + f.inflight-- +} + +func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) { +} + +func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error { + return nil +} + +func (t *fakeWatchApfFilter) Install(c *mux.PathRecorderMux) { +} + +func (f *fakeWatchApfFilter) wait() error { + return wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + f.lock.Lock() + defer f.lock.Unlock() + return f.inflight == 0, nil + }) +} + +func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { + signalsLock := sync.Mutex{} + signals := []utilflowcontrol.InitializationSignal{} + sendSignals := func() { + signalsLock.Lock() + defer signalsLock.Unlock() + for i := range signals { + signals[i].Signal() + } + signals = signals[:0] + } + + newInitializationSignal = func() utilflowcontrol.InitializationSignal { + signalsLock.Lock() + defer signalsLock.Unlock() + signal := utilflowcontrol.NewInitializationSignal() + signals = append(signals, signal) + return signal + } + defer func() { + newInitializationSignal = utilflowcontrol.NewInitializationSignal + }() + + // We test if initialization after receiving initialization signal the + // new requests will be allowed to run by: + // - sending N requests that will occupy the whole capacity + // - sending initialiation signals for them + // - ensuring that number of inflight requests will get to zero + concurrentRequests := 5 + firstRunning := sync.WaitGroup{} + firstRunning.Add(concurrentRequests) + allRunning := sync.WaitGroup{} + allRunning.Add(2 * concurrentRequests) + + fakeFilter := &fakeWatchApfFilter{ + capacity: concurrentRequests, + } + + onExecuteFunc := func() { + firstRunning.Done() + firstRunning.Wait() + + sendSignals() + fakeFilter.wait() + + allRunning.Done() + allRunning.Wait() + } + + postExecuteFunc := func() {} + + server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t) + defer server.Close() + + var wg sync.WaitGroup + wg.Add(2 * concurrentRequests) + for i := 0; i < concurrentRequests; i++ { + go func() { + defer wg.Done() + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + }() + } + + firstRunning.Wait() + fakeFilter.wait() + + firstRunning.Add(concurrentRequests) + for i := 0; i < concurrentRequests; i++ { + go func() { + defer wg.Done() + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + }() + } + wg.Wait() +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 23ffc3ae31e..39e8db0b767 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" @@ -1413,6 +1414,14 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) } + // At this point we already start processing incoming watch events. + // However, the init event can still be processed because their serialization + // and sending to the client happens asynchrnously. + // TODO: As describe in the KEP, we would like to estimate that by delaying + // the initialization signal proportionally to the number of events to + // process, but we're leaving this to the tuning phase. + utilflowcontrol.WatchInitialized(ctx) + defer close(c.result) defer c.Stop() for { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f3a6c08634b..69fc6e38e3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) var ( @@ -701,6 +702,26 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { } } +func TestWatchInitializationSignal(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + initSignal := utilflowcontrol.NewInitializationSignal() + ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) + + _, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + initSignal.Wait() +} + func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index bd87382e83f..0a6f4bc3c15 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -32,6 +32,7 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "go.etcd.io/etcd/clientv3" "k8s.io/klog/v2" @@ -120,6 +121,14 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p } wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred) go wc.run() + + // For etcd watch we don't have an easy way to answer whether the watch + // has already caught up. So in the initial version (given that watchcache + // is by default enabled for all resources but Events), we just deliver + // the initialization signal immediately. Improving this will be explored + // in the future. + utilflowcontrol.WatchInitialized(ctx) + return wc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index a141aaafc38..89a5e7b3f82 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) func TestWatch(t *testing.T) { @@ -313,6 +314,23 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { } } +func TestWatchInitializationSignal(t *testing.T) { + _, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + initSignal := utilflowcontrol.NewInitializationSignal() + ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) + + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + _, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + + initSignal.Wait() +} + func TestProgressNotify(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) clusterConfig := &integration.ClusterConfig{ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go new file mode 100644 index 00000000000..6497e3fff5e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_context.go @@ -0,0 +1,82 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "sync" +) + +type priorityAndFairnessKeyType int + +const ( + // priorityAndFairnessInitializationSignalKey is a key under which + // initialization signal function for watch requests is stored + // in the context. + priorityAndFairnessInitializationSignalKey priorityAndFairnessKeyType = iota +) + +// WithInitializationSignal creates a copy of parent context with +// priority and fairness initialization signal value. +func WithInitializationSignal(ctx context.Context, signal InitializationSignal) context.Context { + return context.WithValue(ctx, priorityAndFairnessInitializationSignalKey, signal) +} + +// initializationSignalFrom returns an initialization signal function +// which when called signals that watch initialization has already finished +// to priority and fairness dispatcher. +func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool) { + signal, ok := ctx.Value(priorityAndFairnessInitializationSignalKey).(InitializationSignal) + return signal, ok && signal != nil +} + +// WatchInitialized sends a signal to priority and fairness dispatcher +// that a given watch request has already been initialized. +func WatchInitialized(ctx context.Context) { + if signal, ok := initializationSignalFrom(ctx); ok { + signal.Signal() + } +} + +// InitializationSignal is an interface that allows sending and handling +// initialization signals. +type InitializationSignal interface { + // Signal notifies the dispatcher about finished initialization. + Signal() + // Wait waits for the initialization signal. + Wait() +} + +type initializationSignal struct { + once sync.Once + done chan struct{} +} + +func NewInitializationSignal() InitializationSignal { + return &initializationSignal{ + once: sync.Once{}, + done: make(chan struct{}), + } +} + +func (i *initializationSignal) Signal() { + i.once.Do(func() { close(i.done) }) +} + +func (i *initializationSignal) Wait() { + <-i.done +}