Merge pull request #101905 from tkashem/apf-request-width

apf: add plumbing to calculate "width" of a request
This commit is contained in:
Kubernetes Prow Robot 2021-06-09 11:25:27 -07:00 committed by GitHub
commit 624967e940
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 137 additions and 29 deletions

View File

@ -64,6 +64,7 @@ import (
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/logs"
@ -215,6 +216,9 @@ type Config struct {
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
MergedResourceConfig *serverstore.ResourceConfig
// RequestWidthEstimator is used to estimate the "width" of the incoming request(s).
RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc
//===========================================================================
// values below here are targets for removal
//===========================================================================
@ -338,6 +342,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator,
APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),
}
@ -728,7 +734,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
if c.FlowControl != nil {
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, c.RequestWidthEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)

View File

@ -28,6 +28,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
)
@ -59,6 +60,7 @@ func WithPriorityAndFairness(
handler http.Handler,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
fcIfc utilflowcontrol.Interface,
widthEstimator flowcontrolrequest.WidthEstimatorFunc,
) http.Handler {
if fcIfc == nil {
klog.Warningf("priority and fairness support not found, skipping")
@ -159,7 +161,11 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, innerReq)
}
}
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user}
// find the estimated "width" of the request
width := widthEstimator.EstimateWidth(r)
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width}
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
if inQueue {
noteWaitingDelta(1)

View File

@ -50,6 +50,8 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
)
func TestMain(m *testing.M) {
@ -67,6 +69,8 @@ const (
decisionSkipFilter
)
var defaultRequestWidthEstimator = func(*http.Request) uint { return 1 }
type fakeApfFilter struct {
mockDecision mockDecision
postEnqueue func()
@ -157,7 +161,7 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int
apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
onExecute()
}), longRunningRequestCheck, flowControlFilter)
}), longRunningRequestCheck, flowControlFilter, defaultRequestWidthEstimator)
handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
@ -562,6 +566,50 @@ func TestApfCancelWaitRequest(t *testing.T) {
})
}
type fakeFilterRequestDigest struct {
*fakeApfFilter
requestDigestGot *utilflowcontrol.RequestDigest
}
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
_ fq.QueueNoteFn, _ func(),
) {
f.requestDigestGot = &requestDigest
}
func TestApfWithRequestDigest(t *testing.T) {
longRunningFunc := func(_ *http.Request, _ *apirequest.RequestInfo) bool { return false }
fakeFilter := &fakeFilterRequestDigest{}
reqDigestExpected := &utilflowcontrol.RequestDigest{
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
User: &user.DefaultInfo{Name: "foo"},
Width: 5,
}
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
longRunningFunc,
fakeFilter,
func(_ *http.Request) uint { return reqDigestExpected.Width },
)
w := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "/bar", nil)
if err != nil {
t.Fatalf("Failed to create new http request - %v", err)
}
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), reqDigestExpected.RequestInfo))
req = req.WithContext(apirequest.WithUser(req.Context(), reqDigestExpected.User))
handler.ServeHTTP(w, req)
if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) {
t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot))
}
}
func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
fcmetrics.Register()
@ -1058,7 +1106,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol.
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter)
apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWidthEstimator)
// add the handler in the chain that adds the specified user to the request context
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@ -82,6 +82,7 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af
type RequestDigest struct {
RequestInfo *request.RequestInfo
User user.Info
Width uint
}
// `*configController` maintains eventual consistency with the API
@ -804,7 +805,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
}
startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
req, idle := plState.queues.StartRequest(ctx, rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {
cfgCtlr.maybeReapLocked(plName, plState)
}

View File

@ -139,7 +139,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool {
return cqs.countActive == 0
}
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
cqs.cts.lock.Lock()
defer cqs.cts.lock.Unlock()
cqs.countActive++

View File

@ -80,7 +80,7 @@ type QueueSet interface {
// was idle at the moment of the return. Otherwise idle==false
// and the client must call the Finish method of the Request
// exactly once.
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
// UpdateObservations makes sure any time-based statistics have
// caught up with the current clock reading

View File

@ -236,10 +236,7 @@ const (
// executing at each point where there is a change in that quantity,
// because the metrics --- and only the metrics --- track that
// quantity per FlowSchema.
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
// all request(s) have a width of 1, in keeping with the current behavior
width := 1.0
func (qs *queueSet) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
var req *request
@ -320,7 +317,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// Seats returns the number of seats this request requires.
func (req *request) Seats() int {
return int(math.Ceil(req.width))
return int(req.width)
}
func (req *request) NoteQueued(inQueue bool) {
@ -440,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
@ -578,7 +575,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
}
}
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width uint, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
now := qs.clock.Now()
req := &request{
qs: qs,

View File

@ -226,7 +226,7 @@ func (ust *uniformScenarioThread) callK(k int) {
if k >= ust.nCalls {
return
}
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
req, idle := ust.uss.qs.StartRequest(context.Background(), 1, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
if req == nil {
atomic.AddUint64(&ust.uss.failedCount, 1)
@ -658,7 +658,7 @@ func TestContextCancel(t *testing.T) {
ctx1 := context.Background()
b2i := map[bool]int{false: 0, true: 1}
var qnc [2][2]int32
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
req1, _ := qs.StartRequest(ctx1, 1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
if req1 == nil {
t.Error("Request rejected")
return
@ -686,7 +686,7 @@ func TestContextCancel(t *testing.T) {
counter.Add(1)
cancel2()
}()
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
req2, idle2a := qs.StartRequest(ctx2, 1, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
if idle2a {
t.Error("2nd StartRequest returned idle")
}
@ -745,7 +745,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
}
ctx := context.Background()
req, _ := qs.StartRequest(ctx, 1, "", "fs", "test", "one", func(inQueue bool) {})
req, _ := qs.StartRequest(ctx, 1, 1, "", "fs", "test", "one", func(inQueue bool) {})
if req == nil {
t.Fatal("expected a Request object from StartRequest, but got nil")
}

View File

@ -44,7 +44,7 @@ type request struct {
startTime time.Time
// width of the request
width float64
width uint
// decision gets set to a `requestDecision` indicating what to do
// with this request. It gets set exactly once, when the request

View File

@ -55,7 +55,7 @@ func (noRestraint) IsIdle() bool {
return false
}
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
func (noRestraint) StartRequest(ctx context.Context, width uint, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
return noRestraintRequest{}, false
}

View File

@ -89,7 +89,7 @@ func TestLiterals(t *testing.T) {
ui := &user.DefaultInfo{Name: "goodu", UID: "1",
Groups: []string{"goodg1", "goodg2"}}
reqRN := RequestDigest{
&request.RequestInfo{
RequestInfo: &request.RequestInfo{
IsResourceRequest: true,
Path: "/apis/goodapig/v1/namespaces/goodns/goodrscs",
Verb: "goodverb",
@ -99,10 +99,13 @@ func TestLiterals(t *testing.T) {
Namespace: "goodns",
Resource: "goodrscs",
Name: "eman",
Parts: []string{"goodrscs", "eman"}},
ui}
Parts: []string{"goodrscs", "eman"},
},
User: ui,
Width: 1,
}
reqRU := RequestDigest{
&request.RequestInfo{
RequestInfo: &request.RequestInfo{
IsResourceRequest: true,
Path: "/apis/goodapig/v1/goodrscs",
Verb: "goodverb",
@ -112,14 +115,20 @@ func TestLiterals(t *testing.T) {
Namespace: "",
Resource: "goodrscs",
Name: "eman",
Parts: []string{"goodrscs", "eman"}},
ui}
Parts: []string{"goodrscs", "eman"},
},
User: ui,
Width: 1,
}
reqN := RequestDigest{
&request.RequestInfo{
RequestInfo: &request.RequestInfo{
IsResourceRequest: false,
Path: "/openapi/v2",
Verb: "goodverb"},
ui}
Verb: "goodverb",
},
User: ui,
Width: 1,
}
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{"goodu"}}},

View File

@ -0,0 +1,40 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"net/http"
)
// DefaultWidthEstimator returns returns '1' as the "width"
// of the given request.
//
// TODO: when we plumb in actual "width" handling for different
// type of request(s) this function will iterate through a chain
// of widthEstimator instance(s).
func DefaultWidthEstimator(_ *http.Request) uint {
return 1
}
// WidthEstimatorFunc returns the estimated "width" of a given request.
// This function will be used by the Priority & Fairness filter to
// estimate the "width" of incoming requests.
type WidthEstimatorFunc func(*http.Request) uint
func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) uint {
return e(r)
}

1
vendor/modules.txt vendored
View File

@ -1494,6 +1494,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing
k8s.io/apiserver/pkg/util/flowcontrol/format
k8s.io/apiserver/pkg/util/flowcontrol/metrics
k8s.io/apiserver/pkg/util/flowcontrol/request
k8s.io/apiserver/pkg/util/flushwriter
k8s.io/apiserver/pkg/util/openapi
k8s.io/apiserver/pkg/util/proxy