Merge pull request #118933 from wojtek-t/apf_watchlist_support

Add support for watchlist to APF
This commit is contained in:
Kubernetes Prow Robot 2023-07-13 23:07:49 -07:00 committed by GitHub
commit 18e0e668ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 1 deletions

View File

@ -68,7 +68,16 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// return maximumSeats for this request to be consistent.
return WorkEstimate{InitialSeats: e.config.MaximumSeats}
}
isListFromCache := !shouldListFromStorage(query, &listOptions)
// For watch requests, we want to adjust the cost only if they explicitly request
// sending initial events.
if requestInfo.Verb == "watch" {
if listOptions.SendInitialEvents == nil || !*listOptions.SendInitialEvents {
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
}
}
isListFromCache := requestInfo.Verb == "watch" || !shouldListFromStorage(query, &listOptions)
numStored, err := e.countGetterFn(key(requestInfo))
switch {

View File

@ -22,6 +22,9 @@ import (
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)
@ -105,6 +108,15 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN
switch requestInfo.Verb {
case "list":
return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
case "watch":
// WATCH supports `SendInitialEvents` option, which effectively means
// that is starts with sending of the contents of a corresponding LIST call.
// From that perspective, given that the watch only consumes APF seats
// during its initialization (sending init events), its cost should then
// be computed the same way as for a regular list.
if utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
}
case "create", "update", "patch", "delete":
return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
}

View File

@ -23,9 +23,14 @@ import (
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func TestWorkEstimator(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
defaultCfg := DefaultWorkEstimatorConfig()
minimumSeats := defaultCfg.MinimumSeats
maximumSeats := defaultCfg.MaximumSeats
@ -284,6 +289,45 @@ func TestWorkEstimator(t *testing.T) {
},
initialSeatsExpected: minimumSeats,
},
{
name: "request verb is watch, sendInitialEvents is nil",
requestURI: "http://server/apis/foo.bar/v1/events?watch=true",
requestInfo: &apirequest.RequestInfo{
Verb: "watch",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
initialSeatsExpected: minimumSeats,
},
{
name: "request verb is watch, sendInitialEvents is false",
requestURI: "http://server/apis/foo.bar/v1/events?watch=true&sendInitialEvents=false",
requestInfo: &apirequest.RequestInfo{
Verb: "watch",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
initialSeatsExpected: minimumSeats,
},
{
name: "request verb is watch, sendInitialEvents is true",
requestURI: "http://server/apis/foo.bar/v1/events?watch=true&sendInitialEvents=true",
requestInfo: &apirequest.RequestInfo{
Verb: "watch",
APIGroup: "foo.bar",
Resource: "events",
},
counts: map[string]int64{
"events.foo.bar": 799,
},
initialSeatsExpected: 8,
},
{
name: "request verb is create, no watches",
requestURI: "http://server/apis/foo.bar/v1/foos",