Estimate width of the request based on watchers count in P&F

This commit is contained in:
wojtekt 2021-07-07 10:48:29 +02:00
parent 8d0acaa3ff
commit 223f9be597
4 changed files with 251 additions and 11 deletions

View File

@ -758,7 +758,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get)
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")

View File

@ -0,0 +1,92 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"math"
"net/http"
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func newMutatingWorkEstimator(countFn watchCountGetterFunc) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{
countFn: countFn,
}
return estimator.estimate
}
type mutatingWorkEstimator struct {
countFn watchCountGetterFunc
}
const (
watchesPerSeat = 10.0
eventAdditionalDuration = 5 * time.Millisecond
)
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
// no RequestInfo should never happen, but to be on the safe side
// let's return a large value.
return WorkEstimate{
InitialSeats: 1,
FinalSeats: maximumSeats,
AdditionalLatency: eventAdditionalDuration,
}
}
watchCount := e.countFn(requestInfo)
// The cost of the request associated with the watchers of that event
// consists of three parts:
// - cost of going through the event change logic
// - cost of serialization of the event
// - cost of processing an event object for each watcher (e.g. filtering,
// sending data over network)
// We're starting simple to get some operational experience with it and
// we will work on tuning the algorithm later. As a starting point we
// we simply assume that processing 1 event takes 1/Nth of a seat for
// M milliseconds and processing different events is infinitely parallelizable.
// We simply record the appropriate values here and rely on potential
// reshaping of the request if the concurrency limit for a given priority
// level will not allow to run request with that many seats.
//
// TODO: As described in the KEP, we should take into account that not all
// events are equal and try to estimate the cost of a single event based on
// some historical data about size of events.
var finalSeats uint
var additionalLatency time.Duration
// TODO: Make this unconditional after we tune the algorithm better.
// Technically, there is an overhead connected to processing an event after
// the request finishes even if there is a small number of watches.
// However, until we tune the estimation we want to stay on the safe side
// an avoid introducing additional latency for almost every single request.
if watchCount >= watchesPerSeat {
finalSeats = uint(math.Ceil(float64(watchCount) / watchesPerSeat))
additionalLatency = eventAdditionalDuration
}
return WorkEstimate{
InitialSeats: 1,
FinalSeats: finalSeats,
AdditionalLatency: additionalLatency,
}
}

View File

@ -65,12 +65,17 @@ func (we *WorkEstimate) MaxSeats() int {
// number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error)
// watchCountGetterFunc represents a function that gets the total
// number of watchers potentially interested in a given request.
type watchCountGetterFunc func(*apirequest.RequestInfo) int
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(countFn objectCountGetterFunc) WorkEstimatorFunc {
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc) WorkEstimatorFunc {
estimator := &workEstimator{
listWorkEstimator: newListWorkEstimator(countFn),
listWorkEstimator: newListWorkEstimator(objectCountFn),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn),
}
return estimator.estimate
}
@ -87,6 +92,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate {
type workEstimator struct {
// listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc
// mutatingWorkEstimator calculates the width of mutating request(s)
mutatingWorkEstimator WorkEstimatorFunc
}
func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
@ -100,6 +107,8 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
switch requestInfo.Verb {
case "list":
return e.listWorkEstimator.EstimateWork(r)
case "create", "update", "patch", "delete":
return e.mutatingWorkEstimator.EstimateWork(r)
}
return WorkEstimate{InitialSeats: minimumSeats}

View File

@ -20,18 +20,22 @@ import (
"errors"
"net/http"
"testing"
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
func TestWorkEstimator(t *testing.T) {
tests := []struct {
name string
requestURI string
requestInfo *apirequest.RequestInfo
counts map[string]int64
countErr error
initialSeatsExpected uint
name string
requestURI string
requestInfo *apirequest.RequestInfo
counts map[string]int64
countErr error
watchCount int
initialSeatsExpected uint
finalSeatsExpected uint
additionalLatencyExpected time.Duration
}{
{
name: "request has no RequestInfo",
@ -248,6 +252,132 @@ func TestWorkEstimator(t *testing.T) {
countErr: errors.New("unknown error"),
initialSeatsExpected: maximumSeats,
},
{
name: "request verb is create, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is create, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is create, watches registered, no additional latency",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 5,
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is create, watches registered, maximum is exceeded",
requestURI: "http://server/apis/foo.bar/v1/foos",
requestInfo: &apirequest.RequestInfo{
Verb: "create",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 199,
initialSeatsExpected: 1,
finalSeatsExpected: 20,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is update, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "update",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is update, watches registered",
requestURI: "http://server/apis/foor.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "update",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is patch, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "patch",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is patch, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "patch",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
{
name: "request verb is delete, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "delete",
APIGroup: "foo.bar",
Resource: "foos",
},
initialSeatsExpected: 1,
finalSeatsExpected: 0,
additionalLatencyExpected: 0,
},
{
name: "request verb is delete, watches registered",
requestURI: "http://server/apis/foo.bar/v1/foos/myfoo",
requestInfo: &apirequest.RequestInfo{
Verb: "delete",
APIGroup: "foo.bar",
Resource: "foos",
},
watchCount: 29,
initialSeatsExpected: 1,
finalSeatsExpected: 3,
additionalLatencyExpected: 5 * time.Millisecond,
},
}
for _, test := range tests {
@ -259,7 +389,10 @@ func TestWorkEstimator(t *testing.T) {
countsFn := func(key string) (int64, error) {
return counts[key], test.countErr
}
estimator := NewWorkEstimator(countsFn)
watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount
}
estimator := NewWorkEstimator(countsFn, watchCountsFn)
req, err := http.NewRequest("GET", test.requestURI, nil)
if err != nil {
@ -272,7 +405,13 @@ func TestWorkEstimator(t *testing.T) {
workestimateGot := estimator.EstimateWork(req)
if test.initialSeatsExpected != workestimateGot.InitialSeats {
t.Errorf("Expected work estimate to match: %d seats, but got: %d seats", test.initialSeatsExpected, workestimateGot.InitialSeats)
t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
}
if test.finalSeatsExpected != workestimateGot.FinalSeats {
t.Errorf("Expected work estimate to match: %d final seats, but got: %d", test.finalSeatsExpected, workestimateGot.FinalSeats)
}
if test.additionalLatencyExpected != workestimateGot.AdditionalLatency {
t.Errorf("Expected work estimate to match additional latency: %v, but got: %v", test.additionalLatencyExpected, workestimateGot.AdditionalLatency)
}
})
}