Implement support for watch initialization in P&F

This commit is contained in:
wojtekt 2021-05-07 12:49:06 +02:00
parent c08526c7f7
commit 0cc217647c
8 changed files with 326 additions and 24 deletions

View File

@ -47,7 +47,10 @@ const (
observationMaintenancePeriod = 10 * time.Second
)
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
var (
nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
watchVerbs = sets.NewString("watch")
)
func handleError(w http.ResponseWriter, r *http.Request, err error) {
errorMsg := fmt.Sprintf("Internal Server Error: %#v", r.RequestURI)

View File

@ -17,13 +17,14 @@ limitations under the License.
package filters
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
apitypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
@ -31,10 +32,6 @@ import (
"k8s.io/klog/v2"
)
type priorityAndFairnessKeyType int
const priorityAndFairnessKey priorityAndFairnessKeyType = iota
// PriorityAndFairnessClassification identifies the results of
// classification for API Priority and Fairness
type PriorityAndFairnessClassification struct {
@ -44,12 +41,6 @@ type PriorityAndFairnessClassification struct {
PriorityLevelUID apitypes.UID
}
// GetClassification returns the classification associated with the
// given context, if any, otherwise nil
func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
}
// waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
@ -60,6 +51,9 @@ var waitingMark = &requestWatermark{
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
// newInitializationSignal is defined for testing purposes.
var newInitializationSignal = utilflowcontrol.NewInitializationSignal
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
@ -84,8 +78,10 @@ func WithPriorityAndFairness(
return
}
// Skip tracking long running requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
@ -116,15 +112,40 @@ func WithPriorityAndFairness(
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
wg := sync.WaitGroup{}
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
innerReq := r.Clone(innerCtx)
innerCtx := ctx
innerReq := r
var watchInitializationSignal utilflowcontrol.InitializationSignal
if isWatchRequest {
watchInitializationSignal = newInitializationSignal()
innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
innerReq = r.Clone(innerCtx)
}
setResponseHeaders(classification, w)
handler.ServeHTTP(w, innerReq)
if isWatchRequest {
wg.Add(1)
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
// Protect from the situations when request will not reach storage layer
// and the initialization signal will not be send.
defer watchInitializationSignal.Signal()
handler.ServeHTTP(w, innerReq)
}()
watchInitializationSignal.Wait()
} else {
handler.ServeHTTP(w, innerReq)
}
}
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user}
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
@ -143,9 +164,13 @@ func WithPriorityAndFairness(
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
}
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
if isWatchRequest {
wg.Done()
}
tooManyRequests(r, w)
}
// In case of watch, from P&F POV it already finished, but we need to wait until the request itself finishes.
wg.Wait()
})
}

View File

@ -138,16 +138,21 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes
}
func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server {
fakeFilter := fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
}
return newApfServerWithFilter(fakeFilter, onExecute, postExecute, t)
}
func newApfServerWithFilter(flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func(), t *testing.T) *httptest.Server {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
onExecute()
}), longRunningRequestCheck, fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
})
}), longRunningRequestCheck, flowControlFilter)
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
@ -175,7 +180,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
// send a watch request to test skipping long running request
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/foos/foo/proxy", server.URL), http.StatusOK); err != nil {
// request should not be rejected
t.Error(err)
}
@ -334,6 +339,136 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
})
}
type fakeWatchApfFilter struct {
lock sync.Mutex
inflight int
capacity int
}
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
_ fq.QueueNoteFn,
execFn func(),
) {
canExecute := false
func() {
f.lock.Lock()
defer f.lock.Unlock()
if f.inflight < f.capacity {
f.inflight++
canExecute = true
}
}()
if !canExecute {
return
}
execFn()
f.lock.Lock()
defer f.lock.Unlock()
f.inflight--
}
func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) {
}
func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
return nil
}
func (t *fakeWatchApfFilter) Install(c *mux.PathRecorderMux) {
}
func (f *fakeWatchApfFilter) wait() error {
return wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
f.lock.Lock()
defer f.lock.Unlock()
return f.inflight == 0, nil
})
}
func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
signalsLock := sync.Mutex{}
signals := []utilflowcontrol.InitializationSignal{}
sendSignals := func() {
signalsLock.Lock()
defer signalsLock.Unlock()
for i := range signals {
signals[i].Signal()
}
signals = signals[:0]
}
newInitializationSignal = func() utilflowcontrol.InitializationSignal {
signalsLock.Lock()
defer signalsLock.Unlock()
signal := utilflowcontrol.NewInitializationSignal()
signals = append(signals, signal)
return signal
}
defer func() {
newInitializationSignal = utilflowcontrol.NewInitializationSignal
}()
// We test if initialization after receiving initialization signal the
// new requests will be allowed to run by:
// - sending N requests that will occupy the whole capacity
// - sending initialiation signals for them
// - ensuring that number of inflight requests will get to zero
concurrentRequests := 5
firstRunning := sync.WaitGroup{}
firstRunning.Add(concurrentRequests)
allRunning := sync.WaitGroup{}
allRunning.Add(2 * concurrentRequests)
fakeFilter := &fakeWatchApfFilter{
capacity: concurrentRequests,
}
onExecuteFunc := func() {
firstRunning.Done()
firstRunning.Wait()
sendSignals()
fakeFilter.wait()
allRunning.Done()
allRunning.Wait()
}
postExecuteFunc := func() {}
server := newApfServerWithFilter(fakeFilter, onExecuteFunc, postExecuteFunc, t)
defer server.Close()
var wg sync.WaitGroup
wg.Add(2 * concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}
firstRunning.Wait()
fakeFilter.wait()
firstRunning.Add(concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}
wg.Wait()
}
func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
@ -1413,6 +1414,14 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
}
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
defer close(c.result)
defer c.Stop()
for {

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)
var (
@ -701,6 +702,26 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
}
}
func TestWatchInitializationSignal(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
initSignal.Wait()
}
func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog/v2"
@ -120,6 +121,14 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()
// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
// is by default enabled for all resources but Events), we just deliver
// the initialization signal immediately. Improving this will be explored
// in the future.
utilflowcontrol.WatchInitialized(ctx)
return wc, nil
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)
func TestWatch(t *testing.T) {
@ -313,6 +314,23 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
}
func TestWatchInitializationSignal(t *testing.T) {
_, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
initSignal.Wait()
}
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{

View File

@ -0,0 +1,82 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"sync"
)
type priorityAndFairnessKeyType int
const (
// priorityAndFairnessInitializationSignalKey is a key under which
// initialization signal function for watch requests is stored
// in the context.
priorityAndFairnessInitializationSignalKey priorityAndFairnessKeyType = iota
)
// WithInitializationSignal creates a copy of parent context with
// priority and fairness initialization signal value.
func WithInitializationSignal(ctx context.Context, signal InitializationSignal) context.Context {
return context.WithValue(ctx, priorityAndFairnessInitializationSignalKey, signal)
}
// initializationSignalFrom returns an initialization signal function
// which when called signals that watch initialization has already finished
// to priority and fairness dispatcher.
func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool) {
signal, ok := ctx.Value(priorityAndFairnessInitializationSignalKey).(InitializationSignal)
return signal, ok && signal != nil
}
// WatchInitialized sends a signal to priority and fairness dispatcher
// that a given watch request has already been initialized.
func WatchInitialized(ctx context.Context) {
if signal, ok := initializationSignalFrom(ctx); ok {
signal.Signal()
}
}
// InitializationSignal is an interface that allows sending and handling
// initialization signals.
type InitializationSignal interface {
// Signal notifies the dispatcher about finished initialization.
Signal()
// Wait waits for the initialization signal.
Wait()
}
type initializationSignal struct {
once sync.Once
done chan struct{}
}
func NewInitializationSignal() InitializationSignal {
return &initializationSignal{
once: sync.Once{},
done: make(chan struct{}),
}
}
func (i *initializationSignal) Signal() {
i.once.Do(func() { close(i.done) })
}
func (i *initializationSignal) Wait() {
<-i.done
}