From c0030a4d27e0a30d89b1b0fddb32928942ca8085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 27 Jun 2023 16:08:33 +0200 Subject: [PATCH] Add support for watchlist to APF --- .../request/list_work_estimator.go | 11 ++++- .../pkg/util/flowcontrol/request/width.go | 12 +++++ .../util/flowcontrol/request/width_test.go | 44 +++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 22f556d2506..f18dd301828 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -68,7 +68,16 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe // return maximumSeats for this request to be consistent. return WorkEstimate{InitialSeats: e.config.MaximumSeats} } - isListFromCache := !shouldListFromStorage(query, &listOptions) + + // For watch requests, we want to adjust the cost only if they explicitly request + // sending initial events. + if requestInfo.Verb == "watch" { + if listOptions.SendInitialEvents == nil || !*listOptions.SendInitialEvents { + return WorkEstimate{InitialSeats: e.config.MinimumSeats} + } + } + + isListFromCache := requestInfo.Verb == "watch" || !shouldListFromStorage(query, &listOptions) numStored, err := e.countGetterFn(key(requestInfo)) switch { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index 86f0425843b..be119e5840d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -22,6 +22,9 @@ import ( "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" ) @@ -105,6 +108,15 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN switch requestInfo.Verb { case "list": return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) + case "watch": + // WATCH supports `SendInitialEvents` option, which effectively means + // that is starts with sending of the contents of a corresponding LIST call. + // From that perspective, given that the watch only consumes APF seats + // during its initialization (sending init events), its cost should then + // be computed the same way as for a regular list. + if utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) + } case "create", "update", "patch", "delete": return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go index fb1e894e9eb..441a05565f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width_test.go @@ -23,9 +23,14 @@ import ( "time" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" ) func TestWorkEstimator(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + defaultCfg := DefaultWorkEstimatorConfig() minimumSeats := defaultCfg.MinimumSeats maximumSeats := defaultCfg.MaximumSeats @@ -284,6 +289,45 @@ func TestWorkEstimator(t *testing.T) { }, initialSeatsExpected: minimumSeats, }, + { + name: "request verb is watch, sendInitialEvents is nil", + requestURI: "http://server/apis/foo.bar/v1/events?watch=true", + requestInfo: &apirequest.RequestInfo{ + Verb: "watch", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + initialSeatsExpected: minimumSeats, + }, + { + name: "request verb is watch, sendInitialEvents is false", + requestURI: "http://server/apis/foo.bar/v1/events?watch=true&sendInitialEvents=false", + requestInfo: &apirequest.RequestInfo{ + Verb: "watch", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + initialSeatsExpected: minimumSeats, + }, + { + name: "request verb is watch, sendInitialEvents is true", + requestURI: "http://server/apis/foo.bar/v1/events?watch=true&sendInitialEvents=true", + requestInfo: &apirequest.RequestInfo{ + Verb: "watch", + APIGroup: "foo.bar", + Resource: "events", + }, + counts: map[string]int64{ + "events.foo.bar": 799, + }, + initialSeatsExpected: 8, + }, { name: "request verb is create, no watches", requestURI: "http://server/apis/foo.bar/v1/foos",