Optimize APF support for watch initialization to fix the pod startup time regression.

This commit is contained in:
wojtekt 2021-07-13 10:22:30 +02:00
parent 43c9e8e7a1
commit ef435b85b4
3 changed files with 145 additions and 89 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package filters package filters
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"runtime" "runtime"
@ -113,30 +114,76 @@ func WithPriorityAndFairness(
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta))) waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
} }
} }
var resultCh chan interface{} queueNote := func(inQueue bool) {
var forgetWatch utilflowcontrol.ForgetWatchFunc if inQueue {
if isWatchRequest { noteWaitingDelta(1)
resultCh = make(chan interface{}) } else {
noteWaitingDelta(-1)
} }
}
// find the estimated "width" of the request
// TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch?
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
width := widthEstimator.EstimateWidth(r)
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width}
if isWatchRequest {
// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute().
// If APF rejects the request, it is never closed.
shouldStartWatchCh := make(chan struct{})
watchInitializationSignal := newInitializationSignal()
// This wraps the request passed to handler.ServeHTTP(),
// setting a context that plumbs watchInitializationSignal to storage
var watchReq *http.Request
// This is set inside execute(), prior to closing shouldStartWatchCh.
// If the request is rejected by APF it is left nil.
var forgetWatch utilflowcontrol.ForgetWatchFunc
defer func() {
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
if watchInitializationSignal != nil {
watchInitializationSignal.Signal()
}
// Forget the watcher if it was registered.
//
// // This is race-free because by this point, one of the following occurred:
// case <-shouldStartWatchCh: execute() completed the assignment to forgetWatch
// case <-resultCh: Handle() completed, and Handle() does not return
// while execute() is running
if forgetWatch != nil {
forgetWatch()
}
}()
execute := func() { execute := func() {
noteExecutingDelta(1) noteExecutingDelta(1)
defer noteExecutingDelta(-1) defer noteExecutingDelta(-1)
served = true served = true
innerCtx := ctx
innerReq := r
var watchInitializationSignal utilflowcontrol.InitializationSignal
if isWatchRequest {
watchInitializationSignal = newInitializationSignal()
innerCtx = utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
innerReq = r.Clone(innerCtx)
}
setResponseHeaders(classification, w) setResponseHeaders(classification, w)
forgetWatch = fcIfc.RegisterWatch(requestInfo) forgetWatch = fcIfc.RegisterWatch(requestInfo)
if isWatchRequest { // Notify the main thread that we're ready to start the watch.
close(shouldStartWatchCh)
// Wait until the request is finished from the APF point of view
// (which is when its initialization is done).
watchInitializationSignal.Wait()
}
// Ensure that an item can be put to resultCh asynchronously.
resultCh := make(chan interface{}, 1)
// Call Handle in a separate goroutine.
// The reason for it is that from APF point of view, the request processing
// finishes as soon as watch is initialized (which is generally orders of
// magnitude faster then the watch request itself). This means that Handle()
// call finishes much faster and for performance reasons we want to reduce
// the number of running goroutines - so we run the shorter thing in a
// dedicated goroutine and the actual watch handler in the main one.
go func() { go func() {
defer func() { defer func() {
err := recover() err := recover()
@ -149,35 +196,60 @@ func WithPriorityAndFairness(
buf = buf[:runtime.Stack(buf, false)] buf = buf[:runtime.Stack(buf, false)]
err = fmt.Sprintf("%v\n%s", err, buf) err = fmt.Sprintf("%v\n%s", err, buf)
} }
// Ensure that the result is put into resultCh independently of the panic.
resultCh <- err resultCh <- err
}() }()
// Protect from the situations when request will not reach storage layer // We create handleCtx with explicit cancelation function.
// and the initialization signal will not be send. // The reason for it is that Handle() underneath may start additional goroutine
defer watchInitializationSignal.Signal() // that is blocked on context cancellation. However, from APF point of view,
// we don't want to wait until the whole watch request is processed (which is
// when it context is actually cancelled) - we want to unblock the goroutine as
// soon as the request is processed from the APF point of view.
//
// Note that we explicitly do NOT call the actuall handler using that context
// to avoid cancelling request too early.
handleCtx, handleCtxCancel := context.WithCancel(ctx)
defer handleCtxCancel()
handler.ServeHTTP(w, innerReq) // Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, note, queueNote, execute)
}() }()
watchInitializationSignal.Wait() select {
} else { case <-shouldStartWatchCh:
handler.ServeHTTP(w, innerReq) watchCtx := utilflowcontrol.WithInitializationSignal(ctx, watchInitializationSignal)
watchReq = r.WithContext(watchCtx)
handler.ServeHTTP(w, watchReq)
// Protect from the situation when request will not reach storage layer
// and the initialization signal will not be send.
// It has to happen before waiting on the resultCh below.
watchInitializationSignal.Signal()
// TODO: Consider finishing the request as soon as Handle call panics.
if err := <-resultCh; err != nil {
panic(err)
} }
case err := <-resultCh:
if err != nil {
panic(err)
}
}
} else {
execute := func() {
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
served = true
setResponseHeaders(classification, w)
handler.ServeHTTP(w, r)
} }
// find the estimated "width" of the request fcIfc.Handle(ctx, digest, note, queueNote, execute)
// TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch?
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
width := widthEstimator.EstimateWidth(r)
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width}
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)
} else {
noteWaitingDelta(-1)
} }
}, execute)
if !served { if !served {
setResponseHeaders(classification, w) setResponseHeaders(classification, w)
@ -187,26 +259,8 @@ func WithPriorityAndFairness(
epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc()
} }
epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests)
if isWatchRequest {
close(resultCh)
}
tooManyRequests(r, w) tooManyRequests(r, w)
} }
// For watch requests, from the APF point of view the request is already
// finished at this point. However, that doesn't mean it is already finished
// from the non-APF point of view. So we need to wait here until the request is:
// 1) finished being processed or
// 2) rejected
if isWatchRequest {
if forgetWatch != nil {
forgetWatch()
}
err := <-resultCh
if err != nil {
panic(err)
}
}
}) })
} }

View File

@ -351,6 +351,23 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
}) })
} }
func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()
server := newApfServerWithSingleRequest(t, decisionCancelWait)
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err)
}
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_request_terminations_total",
"apiserver_dropped_requests_total",
})
}
type fakeWatchApfFilter struct { type fakeWatchApfFilter struct {
lock sync.Mutex lock sync.Mutex
inflight int inflight int
@ -556,23 +573,6 @@ func TestContextClosesOnRequestProcessed(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestApfCancelWaitRequest(t *testing.T) {
epmetrics.Register()
server := newApfServerWithSingleRequest(t, decisionCancelWait)
defer server.Close()
if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil {
t.Error(err)
}
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_request_terminations_total",
"apiserver_dropped_requests_total",
})
}
type fakeFilterRequestDigest struct { type fakeFilterRequestDigest struct {
*fakeApfFilter *fakeApfFilter
requestDigestGot *utilflowcontrol.RequestDigest requestDigestGot *utilflowcontrol.RequestDigest

View File

@ -49,6 +49,8 @@ type Interface interface {
// that the request should be executed then `execute()` will be // that the request should be executed then `execute()` will be
// invoked once to execute the request; otherwise `execute()` will // invoked once to execute the request; otherwise `execute()` will
// not be invoked. // not be invoked.
// Handle() should never return while execute() is running, even if
// ctx is cancelled or times out.
Handle(ctx context.Context, Handle(ctx context.Context,
requestDigest RequestDigest, requestDigest RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),