Merge pull request #102171 from wojtek-t/pf_watch_initialization_support

Implement support for watch initialization in P&F
This commit is contained in:
Kubernetes Prow Robot 2021-06-02 08:02:06 -07:00 committed by GitHub
commit 894f603655
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 434 additions and 34 deletions

View File

@ -47,7 +47,10 @@ const (
observationMaintenancePeriod = 10 * time.Second
)
var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
var (
nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")
watchVerbs = sets.NewString("watch")
)
func handleError(w http.ResponseWriter, r *http.Request, err error) {
errorMsg := fmt.Sprintf("Internal Server Error: %#v", r.RequestURI)

View File

@ -17,9 +17,9 @@ limitations under the License.
package filters
import (
"context"
"fmt"
"net/http"
"runtime"
"sync/atomic"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
@ -31,10 +31,6 @@ import (
"k8s.io/klog/v2"
)
type priorityAndFairnessKeyType int
const priorityAndFairnessKey priorityAndFairnessKeyType = iota
// PriorityAndFairnessClassification identifies the results of
// classification for API Priority and Fairness
type PriorityAndFairnessClassification struct {
@ -44,12 +40,6 @@ type PriorityAndFairnessClassification struct {
PriorityLevelUID apitypes.UID
}
// GetClassification returns the classification associated with the
// given context, if any, otherwise nil
func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
}
// waitingMark tracks requests waiting rather than being executed
var waitingMark = &requestWatermark{
phase: epmetrics.WaitingPhase,
@ -60,6 +50,9 @@ var waitingMark = &requestWatermark{
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
// newInitializationSignal is defined for testing purposes.
var newInitializationSignal = utilflowcontrol.NewInitializationSignal
// WithPriorityAndFairness limits the number of in-flight
// requests in a fine-grained way.
func WithPriorityAndFairness(
@ -84,8 +77,10 @@ func WithPriorityAndFairness(
return
}
// Skip tracking long running requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
isWatchRequest := watchVerbs.Has(requestInfo.Verb)
// Skip tracking long running non-watch requests.
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
handler.ServeHTTP(w, r)
return
@ -116,15 +111,53 @@ func WithPriorityAndFairness(
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
}
}
var resultCh chan interface{}
if isWatchRequest {
resultCh = make(chan interface{})
}
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
innerReq := r.Clone(innerCtx)
innerCtx := ctx
innerReq := r
var watchInitializationSignal utilflowcontrol.InitializationSignal
if isWatchRequest {
watchInitializationSignal = newInitializationSignal()
innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
innerReq = r.Clone(innerCtx)
}
setResponseHeaders(classification, w)
handler.ServeHTTP(w, innerReq)
if isWatchRequest {
go func() {
defer func() {
err := recover()
// do not wrap the sentinel ErrAbortHandler panic value
if err != nil && err != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf)
}
resultCh <- err
}()
// Protect from the situations when request will not reach storage layer
// and the initialization signal will not be send.
defer watchInitializationSignal.Signal()
handler.ServeHTTP(w, innerReq)
}()
watchInitializationSignal.Wait()
} else {
handler.ServeHTTP(w, innerReq)
}
}
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user}
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
@ -143,9 +176,23 @@ func WithPriorityAndFairness(
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
}
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
if isWatchRequest {
close(resultCh)
}
tooManyRequests(r, w)
}
// For watch requests, from the APF point of view the request is already
// finished at this point. However, that doesn't mean it is already finished
// from the non-APF point of view. So we need to wait here until the request is:
// 1) finished being processed or
// 2) rejected
if isWatchRequest {
err := <-resultCh
if err != nil {
panic(err)
}
}
})
}

View File

@ -112,7 +112,7 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error {
func (t fakeApfFilter) Install(c *mux.PathRecorderMux) {
}
func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server {
func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptest.Server {
onExecuteFunc := func() {
if decision == decisionCancelWait {
t.Errorf("execute should not be invoked")
@ -134,20 +134,30 @@ func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptes
t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting)
}
}
return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
return newApfServerWithHooks(t, decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
}
func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server {
func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server {
fakeFilter := fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
}
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
}
func newApfServerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) *httptest.Server {
apfServer := httptest.NewServer(newApfHandlerWithFilter(t, flowControlFilter, onExecute, postExecute))
return apfServer
}
func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Interface, onExecute, postExecute func()) http.Handler {
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
onExecute()
}), longRunningRequestCheck, fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
})
}), longRunningRequestCheck, flowControlFilter)
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
@ -160,14 +170,13 @@ func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEn
}
}), requestInfoFactory)
apfServer := httptest.NewServer(handler)
return apfServer
return handler
}
func TestApfSkipLongRunningRequest(t *testing.T) {
epmetrics.Register()
server := newApfServerWithSingleRequest(decisionSkipFilter, t)
server := newApfServerWithSingleRequest(t, decisionSkipFilter)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -175,7 +184,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
StartPriorityAndFairnessWatermarkMaintenance(ctx.Done())
// send a watch request to test skipping long running request
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil {
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/foos/foo/proxy", server.URL), http.StatusOK); err != nil {
// request should not be rejected
t.Error(err)
}
@ -184,7 +193,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) {
func TestApfRejectRequest(t *testing.T) {
epmetrics.Register()
server := newApfServerWithSingleRequest(decisionReject, t)
server := newApfServerWithSingleRequest(t, decisionReject)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -209,7 +218,7 @@ func TestApfExemptRequest(t *testing.T) {
// so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t)
server := newApfServerWithSingleRequest(t, decisionNoQueuingExecute)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -235,7 +244,7 @@ func TestApfExecuteRequest(t *testing.T) {
// so that an observation will cause some data to go into the Prometheus metrics.
time.Sleep(time.Millisecond * 50)
server := newApfServerWithSingleRequest(decisionQueuingExecute, t)
server := newApfServerWithSingleRequest(t, decisionQueuingExecute)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -307,7 +316,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
finishExecute.Wait()
}
server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t)
server := newApfServerWithHooks(t, decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc)
defer server.Close()
ctx, cancel := context.WithCancel(context.Background())
@ -334,10 +343,212 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
})
}
type fakeWatchApfFilter struct {
lock sync.Mutex
inflight int
capacity int
}
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
_ fq.QueueNoteFn,
execFn func(),
) {
canExecute := false
func() {
f.lock.Lock()
defer f.lock.Unlock()
if f.inflight < f.capacity {
f.inflight++
canExecute = true
}
}()
if !canExecute {
return
}
execFn()
f.lock.Lock()
defer f.lock.Unlock()
f.inflight--
}
func (f *fakeWatchApfFilter) MaintainObservations(stopCh <-chan struct{}) {
}
func (f *fakeWatchApfFilter) Run(stopCh <-chan struct{}) error {
return nil
}
func (t *fakeWatchApfFilter) Install(c *mux.PathRecorderMux) {
}
func (f *fakeWatchApfFilter) wait() error {
return wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
f.lock.Lock()
defer f.lock.Unlock()
return f.inflight == 0, nil
})
}
func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
signalsLock := sync.Mutex{}
signals := []utilflowcontrol.InitializationSignal{}
sendSignals := func() {
signalsLock.Lock()
defer signalsLock.Unlock()
for i := range signals {
signals[i].Signal()
}
signals = signals[:0]
}
newInitializationSignal = func() utilflowcontrol.InitializationSignal {
signalsLock.Lock()
defer signalsLock.Unlock()
signal := utilflowcontrol.NewInitializationSignal()
signals = append(signals, signal)
return signal
}
defer func() {
newInitializationSignal = utilflowcontrol.NewInitializationSignal
}()
// We test if initialization after receiving initialization signal the
// new requests will be allowed to run by:
// - sending N requests that will occupy the whole capacity
// - sending initialiation signals for them
// - ensuring that number of inflight requests will get to zero
concurrentRequests := 5
firstRunning := sync.WaitGroup{}
firstRunning.Add(concurrentRequests)
allRunning := sync.WaitGroup{}
allRunning.Add(2 * concurrentRequests)
fakeFilter := &fakeWatchApfFilter{
capacity: concurrentRequests,
}
onExecuteFunc := func() {
firstRunning.Done()
firstRunning.Wait()
sendSignals()
fakeFilter.wait()
allRunning.Done()
allRunning.Wait()
}
postExecuteFunc := func() {}
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
defer server.Close()
var wg sync.WaitGroup
wg.Add(2 * concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}
firstRunning.Wait()
fakeFilter.wait()
firstRunning.Add(concurrentRequests)
for i := 0; i < concurrentRequests; i++ {
go func() {
defer wg.Done()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Error(err)
}
}()
}
wg.Wait()
}
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 0,
}
onExecuteFunc := func() {
t.Errorf("Request unexepectedly executing")
}
postExecuteFunc := func() {}
server := newApfServerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err)
}
}
func TestApfWatchPanic(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 1,
}
onExecuteFunc := func() {
panic("test panic")
}
postExecuteFunc := func() {}
apfHandler := newApfHandlerWithFilter(t, fakeFilter, onExecuteFunc, postExecuteFunc)
handler := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err == nil {
t.Errorf("expected panic, got %v", err)
}
}()
apfHandler.ServeHTTP(w, r)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
// TestContextClosesOnRequestProcessed ensures that the request context is cancelled
// automatically even if the server doesn't cancel is explicitly.
// This is required to ensure we won't be leaking goroutines that wait for context
// cancelling (e.g. in queueset::StartRequest method).
// Even though in production we are not using httptest.Server, this logic is shared
// across these two.
func TestContextClosesOnRequestProcessed(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
handler := func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// asynchronously wait for context being closed
go func() {
<-ctx.Done()
wg.Done()
}()
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default/pods?watch=true", server.URL), http.StatusOK); err != nil {
t.Errorf("unexpected error: %v", err)
}
wg.Wait()
}
func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()
server := newApfServerWithSingleRequest(decisionCancelWait, t)
server := newApfServerWithSingleRequest(t, decisionCancelWait)
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
@ -1413,6 +1414,14 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
}
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
defer close(c.result)
defer c.Stop()
for {

View File

@ -42,6 +42,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)
var (
@ -701,6 +702,26 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
}
}
func TestWatchInitializationSignal(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
initSignal.Wait()
}
func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog/v2"
@ -120,6 +121,14 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()
// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
// is by default enabled for all resources but Events), we just deliver
// the initialization signal immediately. Improving this will be explored
// in the future.
utilflowcontrol.WatchInitialized(ctx)
return wc, nil
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)
func TestWatch(t *testing.T) {
@ -313,6 +314,23 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
}
func TestWatchInitializationSignal(t *testing.T) {
_, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
_, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
initSignal.Wait()
}
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{

View File

@ -0,0 +1,82 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"sync"
)
type priorityAndFairnessKeyType int
const (
// priorityAndFairnessInitializationSignalKey is a key under which
// initialization signal function for watch requests is stored
// in the context.
priorityAndFairnessInitializationSignalKey priorityAndFairnessKeyType = iota
)
// WithInitializationSignal creates a copy of parent context with
// priority and fairness initialization signal value.
func WithInitializationSignal(ctx context.Context, signal InitializationSignal) context.Context {
return context.WithValue(ctx, priorityAndFairnessInitializationSignalKey, signal)
}
// initializationSignalFrom returns an initialization signal function
// which when called signals that watch initialization has already finished
// to priority and fairness dispatcher.
func initializationSignalFrom(ctx context.Context) (InitializationSignal, bool) {
signal, ok := ctx.Value(priorityAndFairnessInitializationSignalKey).(InitializationSignal)
return signal, ok && signal != nil
}
// WatchInitialized sends a signal to priority and fairness dispatcher
// that a given watch request has already been initialized.
func WatchInitialized(ctx context.Context) {
if signal, ok := initializationSignalFrom(ctx); ok {
signal.Signal()
}
}
// InitializationSignal is an interface that allows sending and handling
// initialization signals.
type InitializationSignal interface {
// Signal notifies the dispatcher about finished initialization.
Signal()
// Wait waits for the initialization signal.
Wait()
}
type initializationSignal struct {
once sync.Once
done chan struct{}
}
func NewInitializationSignal() InitializationSignal {
return &initializationSignal{
once: sync.Once{},
done: make(chan struct{}),
}
}
func (i *initializationSignal) Signal() {
i.once.Do(func() { close(i.done) })
}
func (i *initializationSignal) Wait() {
<-i.done
}