From 223f9be59778b6ec2e44fd57df523f00e246bd95 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 7 Jul 2021 10:48:29 +0200 Subject: [PATCH] Estimate width of the request based on watchers count in P&F --- .../src/k8s.io/apiserver/pkg/server/config.go | 2 +- .../request/mutating_work_estimator.go | 92 +++++++++++ .../pkg/util/flowcontrol/request/width.go | 13 +- .../util/flowcontrol/request/width_test.go | 155 +++++++++++++++++- 4 files changed, 251 insertions(+), 11 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 524c8c0f5ae..48d6ab6242c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -758,7 +758,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = filterlatency.TrackStarted(handler, "authorization") if c.FlowControl != nil { - requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get) + requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount) handler = filterlatency.TrackCompleted(handler) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go new file mode 100644 index 00000000000..5202af2438d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -0,0 +1,92 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "math" + "net/http" + "time" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc { + estimator := &mutatingWorkEstimator{ + countFn: countFn, + } + return estimator.estimate +} + +type mutatingWorkEstimator struct { + countFn watchCountGetterFunc +} + +const ( + watchesPerSeat = 10.0 + eventAdditionalDuration = 5 * time.Millisecond +) + +func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok { + // no RequestInfo should never happen, but to be on the safe side + // let's return a large value. + return WorkEstimate{ + InitialSeats: 1, + FinalSeats: maximumSeats, + AdditionalLatency: eventAdditionalDuration, + } + } + + watchCount := e.countFn(requestInfo) + + // The cost of the request associated with the watchers of that event + // consists of three parts: + // - cost of going through the event change logic + // - cost of serialization of the event + // - cost of processing an event object for each watcher (e.g. filtering, + // sending data over network) + // We're starting simple to get some operational experience with it and + // we will work on tuning the algorithm later. As a starting point we + // we simply assume that processing 1 event takes 1/Nth of a seat for + // M milliseconds and processing different events is infinitely parallelizable. + // We simply record the appropriate values here and rely on potential + // reshaping of the request if the concurrency limit for a given priority + // level will not allow to run request with that many seats. + // + // TODO: As described in the KEP, we should take into account that not all + // events are equal and try to estimate the cost of a single event based on + // some historical data about size of events. + var finalSeats uint + var additionalLatency time.Duration + + // TODO: Make this unconditional after we tune the algorithm better. + // Technically, there is an overhead connected to processing an event after + // the request finishes even if there is a small number of watches. + // However, until we tune the estimation we want to stay on the safe side + // an avoid introducing additional latency for almost every single request. + if watchCount >= watchesPerSeat { + finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) + additionalLatency = eventAdditionalDuration + } + + return WorkEstimate{ + InitialSeats: 1, + FinalSeats: finalSeats, + AdditionalLatency: additionalLatency, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 56899194ed8..b2c6d860a7b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -65,12 +65,17 @@ func (we *WorkEstimate) MaxSeats() int { // number of objects for a given resource. type objectCountGetterFunc func(string) (int64, error) +// watchCountGetterFunc represents a function that gets the total +// number of watchers potentially interested in a given request. +type watchCountGetterFunc func(*apirequest.RequestInfo) int + // NewWorkEstimator estimates the work that will be done by a given request, // if no WorkEstimatorFunc matches the given request then the default // work estimate of 1 seat is allocated to the request. -func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { +func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc { estimator := &workEstimator{ - listWorkEstimator: newListWorkEstimator(countFn), + listWorkEstimator: newListWorkEstimator(objectCountFn), + mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn), } return estimator.estimate } @@ -87,6 +92,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { type workEstimator struct { // listWorkEstimator estimates work for list request(s) listWorkEstimator WorkEstimatorFunc + // mutatingWorkEstimator calculates the width of mutating request(s) + mutatingWorkEstimator WorkEstimatorFunc } func (e *workEstimator) estimate(r *http.Request) WorkEstimate { @@ -100,6 +107,8 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate { switch requestInfo.Verb { case "list": return e.listWorkEstimator.EstimateWork(r) + case "create", "update", "patch", "delete": + return e.mutatingWorkEstimator.EstimateWork(r) } return WorkEstimate{InitialSeats: minimumSeats} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index 1ae860a6304..68cde253200 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -20,18 +20,22 @@ import ( "errors" "net/http" "testing" + "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" ) func TestWorkEstimator(t *testing.T) { tests := []struct { - name string - requestURI string - requestInfo *apirequest.RequestInfo - counts map[string]int64 - countErr error - initialSeatsExpected uint + name string + requestURI string + requestInfo *apirequest.RequestInfo + counts map[string]int64 + countErr error + watchCount int + initialSeatsExpected uint + finalSeatsExpected uint + additionalLatencyExpected time.Duration }{ { name: "request has no RequestInfo", @@ -248,6 +252,132 @@ func TestWorkEstimator(t *testing.T) { countErr: errors.New("unknown error"), initialSeatsExpected: maximumSeats, }, + { + name: "request verb is create, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is create, watches registered, no additional latency", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 5, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered, maximum is exceeded", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 199, + initialSeatsExpected: 1, + finalSeatsExpected: 20, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is update, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is update, watches registered", + requestURI: "http://server/apis/foor.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is patch, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is patch, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is delete, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is delete, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, } for _, test := range tests { @@ -259,7 +389,10 @@ func TestWorkEstimator(t *testing.T) { countsFn := func(key string) (int64, error) { return counts[key], test.countErr } - estimator := NewWorkEstimator(countsFn) + watchCountsFn := func(_ *apirequest.RequestInfo) int { + return test.watchCount + } + estimator := NewWorkEstimator(countsFn, watchCountsFn) req, err := http.NewRequest("GET", test.requestURI, nil) if err != nil { @@ -272,7 +405,13 @@ func TestWorkEstimator(t *testing.T) { workestimateGot := estimator.EstimateWork(req) if test.initialSeatsExpected != workestimateGot.InitialSeats { - t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.initialSeatsExpected, workestimateGot.InitialSeats) + t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats) + } + if test.finalSeatsExpected != workestimateGot.FinalSeats { + t.Errorf("Expected work estimate to match: %d final seats, but got: %d", test.finalSeatsExpected, workestimateGot.FinalSeats) + } + if test.additionalLatencyExpected != workestimateGot.AdditionalLatency { + t.Errorf("Expected work estimate to match additional latency: %v, but got: %v", test.additionalLatencyExpected, workestimateGot.AdditionalLatency) } }) }