From 223f9be59778b6ec2e44fd57df523f00e246bd95 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Wed, 7 Jul 2021 10:48:29 +0200 Subject: [PATCH 1/2] Estimate width of the request based on watchers count in P&F --- .../src/k8s.io/apiserver/pkg/server/config.go | 2 +- .../request/mutating_work_estimator.go | 92 +++++++++++ .../pkg/util/flowcontrol/request/width.go | 13 +- .../util/flowcontrol/request/width_test.go | 155 +++++++++++++++++- 4 files changed, 251 insertions(+), 11 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 524c8c0f5ae..48d6ab6242c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -758,7 +758,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = filterlatency.TrackStarted(handler, "authorization") if c.FlowControl != nil { - requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get) + requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount) handler = filterlatency.TrackCompleted(handler) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go new file mode 100644 index 00000000000..5202af2438d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/mutating_work_estimator.go @@ -0,0 +1,92 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "math" + "net/http" + "time" + + apirequest "k8s.io/apiserver/pkg/endpoints/request" +) + +func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc { + estimator := &mutatingWorkEstimator{ + countFn: countFn, + } + return estimator.estimate +} + +type mutatingWorkEstimator struct { + countFn watchCountGetterFunc +} + +const ( + watchesPerSeat = 10.0 + eventAdditionalDuration = 5 * time.Millisecond +) + +func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { + requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) + if !ok { + // no RequestInfo should never happen, but to be on the safe side + // let's return a large value. + return WorkEstimate{ + InitialSeats: 1, + FinalSeats: maximumSeats, + AdditionalLatency: eventAdditionalDuration, + } + } + + watchCount := e.countFn(requestInfo) + + // The cost of the request associated with the watchers of that event + // consists of three parts: + // - cost of going through the event change logic + // - cost of serialization of the event + // - cost of processing an event object for each watcher (e.g. filtering, + // sending data over network) + // We're starting simple to get some operational experience with it and + // we will work on tuning the algorithm later. As a starting point we + // we simply assume that processing 1 event takes 1/Nth of a seat for + // M milliseconds and processing different events is infinitely parallelizable. + // We simply record the appropriate values here and rely on potential + // reshaping of the request if the concurrency limit for a given priority + // level will not allow to run request with that many seats. + // + // TODO: As described in the KEP, we should take into account that not all + // events are equal and try to estimate the cost of a single event based on + // some historical data about size of events. + var finalSeats uint + var additionalLatency time.Duration + + // TODO: Make this unconditional after we tune the algorithm better. + // Technically, there is an overhead connected to processing an event after + // the request finishes even if there is a small number of watches. + // However, until we tune the estimation we want to stay on the safe side + // an avoid introducing additional latency for almost every single request. + if watchCount >= watchesPerSeat { + finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat)) + additionalLatency = eventAdditionalDuration + } + + return WorkEstimate{ + InitialSeats: 1, + FinalSeats: finalSeats, + AdditionalLatency: additionalLatency, + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 56899194ed8..b2c6d860a7b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -65,12 +65,17 @@ func (we *WorkEstimate) MaxSeats() int { // number of objects for a given resource. type objectCountGetterFunc func(string) (int64, error) +// watchCountGetterFunc represents a function that gets the total +// number of watchers potentially interested in a given request. +type watchCountGetterFunc func(*apirequest.RequestInfo) int + // NewWorkEstimator estimates the work that will be done by a given request, // if no WorkEstimatorFunc matches the given request then the default // work estimate of 1 seat is allocated to the request. -func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc { +func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc { estimator := &workEstimator{ - listWorkEstimator: newListWorkEstimator(countFn), + listWorkEstimator: newListWorkEstimator(objectCountFn), + mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn), } return estimator.estimate } @@ -87,6 +92,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { type workEstimator struct { // listWorkEstimator estimates work for list request(s) listWorkEstimator WorkEstimatorFunc + // mutatingWorkEstimator calculates the width of mutating request(s) + mutatingWorkEstimator WorkEstimatorFunc } func (e *workEstimator) estimate(r *http.Request) WorkEstimate { @@ -100,6 +107,8 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate { switch requestInfo.Verb { case "list": return e.listWorkEstimator.EstimateWork(r) + case "create", "update", "patch", "delete": + return e.mutatingWorkEstimator.EstimateWork(r) } return WorkEstimate{InitialSeats: minimumSeats} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index 1ae860a6304..68cde253200 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -20,18 +20,22 @@ import ( "errors" "net/http" "testing" + "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" ) func TestWorkEstimator(t *testing.T) { tests := []struct { - name string - requestURI string - requestInfo *apirequest.RequestInfo - counts map[string]int64 - countErr error - initialSeatsExpected uint + name string + requestURI string + requestInfo *apirequest.RequestInfo + counts map[string]int64 + countErr error + watchCount int + initialSeatsExpected uint + finalSeatsExpected uint + additionalLatencyExpected time.Duration }{ { name: "request has no RequestInfo", @@ -248,6 +252,132 @@ func TestWorkEstimator(t *testing.T) { countErr: errors.New("unknown error"), initialSeatsExpected: maximumSeats, }, + { + name: "request verb is create, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is create, watches registered, no additional latency", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 5, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is create, watches registered, maximum is exceeded", + requestURI: "http://server/apis/foo.bar/v1/foos", + requestInfo: &apirequest.RequestInfo{ + Verb: "create", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 199, + initialSeatsExpected: 1, + finalSeatsExpected: 20, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is update, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is update, watches registered", + requestURI: "http://server/apis/foor.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "update", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is patch, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is patch, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "patch", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, + { + name: "request verb is delete, no watches", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", + }, + initialSeatsExpected: 1, + finalSeatsExpected: 0, + additionalLatencyExpected: 0, + }, + { + name: "request verb is delete, watches registered", + requestURI: "http://server/apis/foo.bar/v1/foos/myfoo", + requestInfo: &apirequest.RequestInfo{ + Verb: "delete", + APIGroup: "foo.bar", + Resource: "foos", + }, + watchCount: 29, + initialSeatsExpected: 1, + finalSeatsExpected: 3, + additionalLatencyExpected: 5 * time.Millisecond, + }, } for _, test := range tests { @@ -259,7 +389,10 @@ func TestWorkEstimator(t *testing.T) { countsFn := func(key string) (int64, error) { return counts[key], test.countErr } - estimator := NewWorkEstimator(countsFn) + watchCountsFn := func(_ *apirequest.RequestInfo) int { + return test.watchCount + } + estimator := NewWorkEstimator(countsFn, watchCountsFn) req, err := http.NewRequest("GET", test.requestURI, nil) if err != nil { @@ -272,7 +405,13 @@ func TestWorkEstimator(t *testing.T) { workestimateGot := estimator.EstimateWork(req) if test.initialSeatsExpected != workestimateGot.InitialSeats { - t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.initialSeatsExpected, workestimateGot.InitialSeats) + t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats) + } + if test.finalSeatsExpected != workestimateGot.FinalSeats { + t.Errorf("Expected work estimate to match: %d final seats, but got: %d", test.finalSeatsExpected, workestimateGot.FinalSeats) + } + if test.additionalLatencyExpected != workestimateGot.AdditionalLatency { + t.Errorf("Expected work estimate to match additional latency: %v, but got: %v", test.additionalLatencyExpected, workestimateGot.AdditionalLatency) } }) } From c5a77d8a761b0651b53864f9e396d6f23efd01d2 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 8 Oct 2021 11:14:11 +0200 Subject: [PATCH 2/2] Adjust final seats if they don't fit the limit --- .../fairqueuing/queueset/queueset.go | 10 ++++++++++ .../fairqueuing/queueset/seatsecs_test.go | 19 +++++++++++++++++++ .../flowcontrol/fairqueuing/queueset/types.go | 19 ++++++++++++++++--- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 08a4ab385fd..115ad73187d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -748,6 +748,16 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { return nil } + // If the requested final seats exceed capacity of that queue, + // we reduce them to current capacity and adjust additional latency + // to preserve the total amount of work. + if oldestReqFromMinQueue.workEstimate.FinalSeats > uint(qs.dCfg.ConcurrencyLimit) { + finalSeats := uint(qs.dCfg.ConcurrencyLimit) + additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats)) + oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats + oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency + } + // we set the round robin indexing to start at the chose queue // for the next round. This way the non-selected queues // win in the case that the virtual finish times are the same diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go index b0a98b2570b..781f1424edc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/seatsecs_test.go @@ -43,3 +43,22 @@ func TestSeatSecondsString(t *testing.T) { } } } + +func TestSeatSecondsPerSeat(t *testing.T) { + testCases := []struct { + ss SeatSeconds + seats float64 + expect time.Duration + }{ + {ss: SeatsTimesDuration(10, time.Second), seats: 1, expect: 10 * time.Second}, + {ss: SeatsTimesDuration(1, time.Second), seats: 10, expect: 100 * time.Millisecond}, + {ss: SeatsTimesDuration(13, 5*time.Millisecond), seats: 5, expect: 13 * time.Millisecond}, + {ss: SeatsTimesDuration(12, 0), seats: 10, expect: 0}, + } + for _, testCase := range testCases { + actualDuration := testCase.ss.DurationPerSeat(testCase.seats) + if actualDuration != testCase.expect { + t.Errorf("DurationPerSeats returned %v rather than expected %q", actualDuration, testCase.expect) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7e9a3f3bdf1..08a6f6b3dfc 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -81,6 +81,7 @@ type request struct { type completedWorkEstimate struct { fcrequest.WorkEstimate totalWork SeatSeconds // initial plus final work + finalWork SeatSeconds // only final work } // queue is an array of requests with additional metadata required for @@ -122,14 +123,20 @@ func (req *request) totalWork() SeatSeconds { } func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate { + finalWork := qs.computeFinalWork(we) return completedWorkEstimate{ WorkEstimate: *we, - totalWork: qs.computeTotalWork(we), + totalWork: qs.computeInitialWork(we) + finalWork, + finalWork: finalWork, } } -func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds { - return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) +func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds { + return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) +} + +func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { + return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } // Enqueue enqueues a request into the queue and @@ -199,6 +206,12 @@ func (ss SeatSeconds) ToFloat() float64 { return float64(ss) / ssScale } +// DurationPerSeat returns duration per seat. +// This division may lose precision. +func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration { + return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale)) +} + // String converts to a string. // This is suitable for large as well as small values. func (ss SeatSeconds) String() string {