Merge pull request #103539 from wojtek-t/pf_estimate_watch_width

Estimate width of the request based on watchers count in P&F
This commit is contained in:
Kubernetes Prow Robot 2021-10-12 09:25:48 -07:00 committed by GitHub
commit af2ed2569d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 296 additions and 14 deletions

View File

@ -758,7 +758,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")

View File

@ -795,6 +795,16 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
return nil
}
// If the requested final seats exceed capacity of that queue,
// we reduce them to current capacity and adjust additional latency
// to preserve the total amount of work.
if oldestReqFromMinQueue.workEstimate.FinalSeats > uint(qs.dCfg.ConcurrencyLimit) {
finalSeats := uint(qs.dCfg.ConcurrencyLimit)
additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same

View File

@ -47,3 +47,22 @@ func TestSeatSecondsString(t *testing.T) {
}
}
}
func TestSeatSecondsPerSeat(t *testing.T) {
testCases := []struct {
ss SeatSeconds
seats float64
expect time.Duration
}{
{ss: SeatsTimesDuration(10, time.Second), seats: 1, expect: 10 * time.Second},
{ss: SeatsTimesDuration(1, time.Second), seats: 10, expect: 100 * time.Millisecond},
{ss: SeatsTimesDuration(13, 5*time.Millisecond), seats: 5, expect: 13 * time.Millisecond},
{ss: SeatsTimesDuration(12, 0), seats: 10, expect: 0},
}
for _, testCase := range testCases {
actualDuration := testCase.ss.DurationPerSeat(testCase.seats)
if actualDuration != testCase.expect {
t.Errorf("DurationPerSeats returned %v rather than expected %q", actualDuration, testCase.expect)
}
}
}

View File

@ -82,6 +82,7 @@ type request struct {
type completedWorkEstimate struct {
fcrequest.WorkEstimate
totalWork SeatSeconds // initial plus final work
finalWork SeatSeconds // only final work
}
// queue is an array of requests with additional metadata required for
@ -123,14 +124,20 @@ func (req *request) totalWork() SeatSeconds {
}
func (qs *queueSet) completeWorkEstimate(we *fcrequest.WorkEstimate) completedWorkEstimate {
finalWork := qs.computeFinalWork(we)
return completedWorkEstimate{
WorkEstimate: *we,
totalWork: qs.computeTotalWork(we),
totalWork: qs.computeInitialWork(we) + finalWork,
finalWork: finalWork,
}
}
func (qs *queueSet) computeTotalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration) + SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
func (qs *queueSet) computeInitialWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.InitialSeats), qs.estimatedServiceDuration)
}
func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
}
// Enqueue enqueues a request into the queue and
@ -200,6 +207,12 @@ func (ss SeatSeconds) ToFloat() float64 {
return float64(ss) / ssScale
}
// DurationPerSeat returns duration per seat.
// This division may lose precision.
func (ss SeatSeconds) DurationPerSeat(seats float64) time.Duration {
return time.Duration(float64(ss) / seats * (float64(time.Second) / ssScale))
}
// String converts to a string.
// This is suitable for large as well as small values.
func (ss SeatSeconds) String() string {

View File

@ -0,0 +1,92 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"math"
"net/http"
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{
countFn: countFn,
}
return estimator.estimate
}
type mutatingWorkEstimator struct {
countFn watchCountGetterFunc
}
const (
watchesPerSeat = 10.0
eventAdditionalDuration = 5 * time.Millisecond
)
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
// no RequestInfo should never happen, but to be on the safe side
// let's return a large value.
return WorkEstimate{
InitialSeats: 1,
FinalSeats: maximumSeats,
AdditionalLatency: eventAdditionalDuration,
}
}
watchCount := e.countFn(requestInfo)
// The cost of the request associated with the watchers of that event
// consists of three parts:
// - cost of going through the event change logic
// - cost of serialization of the event
// - cost of processing an event object for each watcher (e.g. filtering,
// sending data over network)
// We're starting simple to get some operational experience with it and
// we will work on tuning the algorithm later. As a starting point we
// we simply assume that processing 1 event takes 1/Nth of a seat for
// M milliseconds and processing different events is infinitely parallelizable.
// We simply record the appropriate values here and rely on potential
// reshaping of the request if the concurrency limit for a given priority
// level will not allow to run request with that many seats.
//
// TODO: As described in the KEP, we should take into account that not all
// events are equal and try to estimate the cost of a single event based on
// some historical data about size of events.
var finalSeats uint
var additionalLatency time.Duration
// TODO: Make this unconditional after we tune the algorithm better.
// Technically, there is an overhead connected to processing an event after
// the request finishes even if there is a small number of watches.
// However, until we tune the estimation we want to stay on the safe side
// an avoid introducing additional latency for almost every single request.
if watchCount >= watchesPerSeat {
finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat))
additionalLatency = eventAdditionalDuration
}
return WorkEstimate{
InitialSeats: 1,
FinalSeats: finalSeats,
AdditionalLatency: additionalLatency,
}
}

View File

@ -65,12 +65,17 @@ func (we *WorkEstimate) MaxSeats() int {
// number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error)
// watchCountGetterFunc represents a function that gets the total
// number of watchers potentially interested in a given request.
type watchCountGetterFunc func(*apirequest.RequestInfo) int
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc {
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc {
estimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(countFn),
listWorkEstimator: newListWorkEstimator(objectCountFn),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn),
}
return estimator.estimate
}
@ -87,6 +92,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate {
type workEstimator struct {
// listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc
// mutatingWorkEstimator calculates the width of mutating request(s)
mutatingWorkEstimator WorkEstimatorFunc
}
func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
@ -100,6 +107,8 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
switch requestInfo.Verb {
case "list":
return e.listWorkEstimator.EstimateWork(r)
case "create", "update", "patch", "delete":
return e.mutatingWorkEstimator.EstimateWork(r)
}
return WorkEstimate{InitialSeats: minimumSeats}

View File

@ -20,18 +20,22 @@ import (
"errors"
"net/http"
"testing"
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func TestWorkEstimator(t *testing.T) {
tests := []struct {
name string
requestURI string
requestInfo *apirequest.RequestInfo
counts map[string]int64
countErr error
initialSeatsExpected uint
name string
requestURI string
requestInfo *apirequest.RequestInfo
counts map[string]int64
countErr error
watchCount int
initialSeatsExpected uint
finalSeatsExpected uint
additionalLatencyExpected time.Duration
}{
{
name: "request has no RequestInfo",
@ -248,6 +252,132 @@ func TestWorkEstimator(t *testing.T) {
countErr: errors.New("unknown error"),
initialSeatsExpected: maximumSeats,
},
{
name: "request verb is create, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is create, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is create, watches registered, no additional latency",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 5,
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is create, watches registered, maximum is exceeded",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 199,
initialSeatsExpected: 1,
finalSeatsExpected: 20,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is update, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "update",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is update, watches registered",
requestURI: "http://server/apis/foor.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "update",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is patch, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "patch",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is patch, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "patch",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is delete, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "delete",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is delete, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "delete",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
}
for _, test := range tests {
@ -259,7 +389,10 @@ func TestWorkEstimator(t *testing.T) {
countsFn := func(key string) (int64, error) {
return counts[key], test.countErr
}
estimator := NewWorkEstimator(countsFn)
watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount
}
estimator := NewWorkEstimator(countsFn, watchCountsFn)
req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil {
@ -272,7 +405,13 @@ func TestWorkEstimator(t *testing.T) {
workestimateGot := estimator.EstimateWork(req)
if test.initialSeatsExpected != workestimateGot.InitialSeats {
t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.initialSeatsExpected, workestimateGot.InitialSeats)
t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
}
if test.finalSeatsExpected != workestimateGot.FinalSeats {
t.Errorf("Expected work estimate to match: %d final seats, but got: %d", test.finalSeatsExpected, workestimateGot.FinalSeats)
}
if test.additionalLatencyExpected != workestimateGot.AdditionalLatency {
t.Errorf("Expected work estimate to match additional latency: %v, but got: %v", test.additionalLatencyExpected, workestimateGot.AdditionalLatency)
}
})
}