From cea1dcfeed2fc4e8ab89cd43e5a0e402251c8df5 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 15 Jun 2021 10:49:42 +0200 Subject: [PATCH] Add watch tracker to APF for request cost estimation --- .../server/filters/priority-and-fairness.go | 8 + .../filters/priority-and-fairness_test.go | 24 +- .../pkg/util/flowcontrol/apf_controller.go | 4 + .../pkg/util/flowcontrol/apf_filter.go | 3 + .../pkg/util/flowcontrol/watch_tracker.go | 148 ++++++++++ .../util/flowcontrol/watch_tracker_test.go | 255 ++++++++++++++++++ 6 files changed, 433 insertions(+), 9 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6b14b42e203..00b6c030179 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -114,6 +114,7 @@ func WithPriorityAndFairness( } } var resultCh chan interface{} + var forgetWatch utilflowcontrol.ForgetWatchFunc if isWatchRequest { resultCh = make(chan interface{}) } @@ -133,6 +134,8 @@ func WithPriorityAndFairness( } setResponseHeaders(classification, w) + forgetWatch = fcIfc.RegisterWatch(requestInfo) + if isWatchRequest { go func() { defer func() { @@ -163,6 +166,8 @@ func WithPriorityAndFairness( } // find the estimated "width" of the request + // TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch? + // TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter. width := widthEstimator.EstimateWidth(r) digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width} @@ -194,6 +199,9 @@ func WithPriorityAndFairness( // 1) finished being processed or // 2) rejected if isWatchRequest { + if forgetWatch != nil { + forgetWatch() + } err := <-resultCh if err != nil { panic(err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 166c8fdde6a..680109e8c43 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -76,6 +76,8 @@ type fakeApfFilter struct { mockDecision mockDecision postEnqueue func() postDequeue func() + + utilflowcontrol.WatchTracker } func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { @@ -147,6 +149,7 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE mockDecision: decision, postEnqueue: postEnqueue, postDequeue: postDequeue, + WatchTracker: utilflowcontrol.NewWatchTracker(), } return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute) } @@ -352,6 +355,15 @@ type fakeWatchApfFilter struct { lock sync.Mutex inflight int capacity int + + utilflowcontrol.WatchTracker +} + +func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { + return &fakeWatchApfFilter{ + capacity: capacity, + WatchTracker: utilflowcontrol.NewWatchTracker(), + } } func (f *fakeWatchApfFilter) Handle(ctx context.Context, @@ -432,9 +444,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { allRunning := sync.WaitGroup{} allRunning.Add(2 * concurrentRequests) - fakeFilter := &fakeWatchApfFilter{ - capacity: concurrentRequests, - } + fakeFilter := newFakeWatchApfFilter(concurrentRequests) onExecuteFunc := func() { firstRunning.Done() @@ -479,9 +489,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) { } func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { - fakeFilter := &fakeWatchApfFilter{ - capacity: 0, - } + fakeFilter := newFakeWatchApfFilter(0) onExecuteFunc := func() { t.Errorf("Request unexepectedly executing") @@ -497,9 +505,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) { } func TestApfWatchPanic(t *testing.T) { - fakeFilter := &fakeWatchApfFilter{ - capacity: 1, - } + fakeFilter := newFakeWatchApfFilter(1) onExecuteFunc := func() { panic("test panic") diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 90228584c6b..bd9fd9ad921 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -145,6 +145,9 @@ type configController struct { // to a given FlowSchema in any minute. // This may only be accessed from the one and only worker goroutine. mostRecentUpdates []updateAttempt + + // watchTracker implements the necessary WatchTracker interface. + WatchTracker } type updateAttempt struct { @@ -192,6 +195,7 @@ func newTestableController(config TestableConfig) *configController { requestWaitLimit: config.RequestWaitLimit, flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), + WatchTracker: NewWatchTracker(), } klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 825ae09ce3d..f2e58a91bc2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -66,6 +66,9 @@ type Interface interface { // Install installs debugging endpoints to the web-server. Install(c *mux.PathRecorderMux) + + // WatchTracker provides the WatchTracker interface. + WatchTracker } // This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go new file mode 100644 index 00000000000..4dc7c131eb0 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go @@ -0,0 +1,148 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "sync" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/request" +) + +// readOnlyVerbs contains verbs for read-only requests. +var readOnlyVerbs = sets.NewString("get", "list", "watch", "proxy") + +// watchIdentifier identifies group of watches that are similar. +// As described in the "Priority and Fairness" KEP, we consider +// watches similar if they have the same resourceType, namespace +// and name. We ignore selectors as they have to be evaluated +// when processing an even anyway. +// +// TODO: For now we only track the number of watches registered +// in our kube-apiserver. Eventually we should consider sharing +// this information with other kube-apiserver as described in the +// KEP, but this isn't part of the first version. +type watchIdentifier struct { + apiGroup string + resource string + namespace string + name string +} + +// ForgetWatchFunc is a function that should be called to forget +// the previously registered watch from the watch tracker. +type ForgetWatchFunc func() + +// WatchTracker is an interface that allows tracking the number +// of watches in the system for the purpose of estimating the +// cost of incoming mutating requests. +type WatchTracker interface { + // RegisterWatch reqisters a watch with the provided requestInfo + // in the tracker. It returns the function that should be called + // to forget the watcher once it is finished. + RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc + + // GetInterestedWatchCount returns the number of watches that are + // potentially interested in a request with a given RequestInfo + // for the purpose of estimating cost of that request. + GetInterestedWatchCount(requestInfo *request.RequestInfo) int +} + +// watchTracker tracks the number of watches in the system for +// the purpose of estimating the cost of incoming mutating requests. +type watchTracker struct { + lock sync.Mutex + + watchCount map[watchIdentifier]int +} + +func NewWatchTracker() WatchTracker { + return &watchTracker{ + watchCount: make(map[watchIdentifier]int), + } +} + +// RegisterWatch implements WatchTracker interface. +func (w *watchTracker) RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc { + if requestInfo == nil || requestInfo.Verb != "watch" { + return nil + } + + identifier := &watchIdentifier{ + apiGroup: requestInfo.APIGroup, + resource: requestInfo.Resource, + namespace: requestInfo.Namespace, + name: requestInfo.Name, + } + + w.lock.Lock() + defer w.lock.Unlock() + w.watchCount[*identifier]++ + return w.forgetWatch(identifier) +} + +func (w *watchTracker) forgetWatch(identifier *watchIdentifier) ForgetWatchFunc { + return func() { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchCount[*identifier]-- + if w.watchCount[*identifier] == 0 { + delete(w.watchCount, *identifier) + } + } +} + +// GetInterestedWatchCount implements WatchTracker interface. +// +// TODO(wojtek-t): As of now, requestInfo for object creation (POST) doesn't +// contain the Name field set. Figure out if we can somehow get it for the +// more accurate cost estimation. +// +// TODO(wojtek-t): Figure out how to approach DELETECOLLECTION calls. +func (w *watchTracker) GetInterestedWatchCount(requestInfo *request.RequestInfo) int { + if requestInfo == nil || readOnlyVerbs.Has(requestInfo.Verb) { + return 0 + } + + result := 0 + // The watches that we're interested in include: + // - watches for all objects of a resource type (no namespace and name specified) + // - watches for all objects of a resource type in the same namespace (no name specified) + // - watched interested in this particular object + identifier := &watchIdentifier{ + apiGroup: requestInfo.APIGroup, + resource: requestInfo.Resource, + } + + w.lock.Lock() + defer w.lock.Unlock() + + result += w.watchCount[*identifier] + + if requestInfo.Namespace != "" { + identifier.namespace = requestInfo.Namespace + result += w.watchCount[*identifier] + } + + if requestInfo.Name != "" { + identifier.name = requestInfo.Name + result += w.watchCount[*identifier] + } + + return result +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go new file mode 100644 index 00000000000..fafccbe4255 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "net/http" + "net/url" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/request" +) + +func httpRequest(method, path, rawQuery string) *http.Request { + return &http.Request{ + Method: method, + URL: &url.URL{ + Path: path, + RawQuery: rawQuery, + }, + } +} + +func newWatchIdentifier(apiGroup, resource, namespace, name string) *watchIdentifier { + return &watchIdentifier{ + apiGroup: apiGroup, + resource: resource, + namespace: namespace, + name: name, + } +} + +func TestRegisterWatch(t *testing.T) { + testCases := []struct { + name string + request *http.Request + expected *watchIdentifier + }{ + { + name: "watch all objects", + request: httpRequest("GET", "/api/v1/pods", "watch=true"), + expected: newWatchIdentifier("", "pods", "", ""), + }, + { + name: "list all objects", + request: httpRequest("GET", "/api/v1/pods", ""), + expected: nil, + }, + { + name: "watch namespace-scoped objects", + request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true"), + expected: newWatchIdentifier("", "pods", "foo", ""), + }, + { + name: "watch single object", + request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"), + expected: newWatchIdentifier("", "pods", "foo", "mypod"), + }, + { + name: "watch single cluster-scoped object", + request: httpRequest("GET", "/api/v1/namespaces", "watch=true&fieldSelector=metadata.name=myns"), + expected: newWatchIdentifier("", "namespaces", "", "myns"), + }, + { + name: "watch all objects from api-group", + request: httpRequest("GET", "/apis/group/v1/pods", "watch=true"), + expected: newWatchIdentifier("group", "pods", "", ""), + }, + { + name: "watch namespace-scoped objects", + request: httpRequest("GET", "/apis/group/v1/namespaces/foo/pods", "watch=true"), + expected: newWatchIdentifier("group", "pods", "foo", ""), + }, + { + name: "watch single object", + request: httpRequest("GET", "/apis/group/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"), + expected: newWatchIdentifier("group", "pods", "foo", "mypod"), + }, + } + + requestInfoFactory := &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + watchTracker := &watchTracker{ + watchCount: make(map[watchIdentifier]int), + } + + requestInfo, err := requestInfoFactory.NewRequestInfo(testCase.request) + if err != nil { + t.Fatalf("unexpected error from requestInfo creation: %#v", err) + } + + forget := watchTracker.RegisterWatch(requestInfo) + if testCase.expected == nil { + if forget != nil { + t.Errorf("unexpected watch registered: %#v", watchTracker.watchCount) + } + return + } + + if forget == nil { + t.Errorf("watch should be registered, got: %v", forget) + return + } + if count := watchTracker.watchCount[*testCase.expected]; count != 1 { + t.Errorf("unexpected watch registered: %#v", watchTracker.watchCount) + } + forget() + if count := watchTracker.watchCount[*testCase.expected]; count != 0 { + t.Errorf("forget should unregister the watch: %#v", watchTracker.watchCount) + } + }) + } +} + +func TestGetInterestedWatchCount(t *testing.T) { + watchTracker := NewWatchTracker() + + registeredWatches := []*http.Request{ + httpRequest("GET", "api/v1/pods", "watch=true"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"), + httpRequest("GET", "api/v1/namespaces/bar/pods", "watch=true&fieldSelector=metadata.name=mypod"), + httpRequest("GET", "apis/group/v1/namespaces/foo/pods", "watch=true"), + httpRequest("GET", "apis/group/v1/namespaces/bar/pods", "watch=true&fieldSelector=metadata.name=mypod"), + } + requestInfoFactory := &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } + for _, req := range registeredWatches { + requestInfo, err := requestInfoFactory.NewRequestInfo(req) + if err != nil { + t.Fatalf("unexpected error from requestInfo creation: %#v", err) + } + if forget := watchTracker.RegisterWatch(requestInfo); forget == nil { + t.Errorf("watch wasn't registered: %#v", requestInfo) + } + } + + testCases := []struct { + name string + request *http.Request + expected int + }{ + { + name: "pod creation in foo namespace", + request: httpRequest("POST", "/api/v1/namespaces/foo/pods", ""), + expected: 2, + }, + { + name: "mypod update in foo namespace", + request: httpRequest("PUT", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 3, + }, + { + name: "mypod patch in foo namespace", + request: httpRequest("PATCH", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 3, + }, + { + name: "mypod deletion in foo namespace", + request: httpRequest("DELETE", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 3, + }, + { + name: "otherpod update in foo namespace", + request: httpRequest("PUT", "/api/v1/namespaces/foo/pods/otherpod", ""), + expected: 2, + }, + { + name: "mypod get in foo namespace", + request: httpRequest("GET", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 0, + }, + { + name: "pods list in foo namespace", + request: httpRequest("GET", "/api/v1/namespaces/foo/pods", ""), + expected: 0, + }, + { + name: "pods watch in foo namespace", + request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true"), + expected: 0, + }, + { + name: "pods proxy in foo namespace", + request: httpRequest("GET", "/api/v1/proxy/namespaces/foo/pods/mypod", ""), + expected: 0, + }, + { + name: "pod creation in bar namespace", + request: httpRequest("POST", "/api/v1/namespaces/bar/pods", ""), + expected: 1, + }, + { + name: "mypod update in bar namespace", + request: httpRequest("PUT", "/api/v1/namespaces/bar/pods/mypod", ""), + expected: 2, + }, + { + name: "mypod update in foo namespace in group group", + request: httpRequest("PUT", "/apis/group/v1/namespaces/foo/pods/mypod", ""), + expected: 1, + }, + { + name: "otherpod update in foo namespace in group group", + request: httpRequest("PUT", "/apis/group/v1/namespaces/foo/pods/otherpod", ""), + expected: 1, + }, + { + name: "mypod update in var namespace in group group", + request: httpRequest("PUT", "/apis/group/v1/namespaces/bar/pods/mypod", ""), + expected: 1, + }, + { + name: "otherpod update in bar namespace in group group", + request: httpRequest("PUT", "/apis/group/v1/namespaces/bar/pods/otherpod", ""), + expected: 0, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + requestInfo, err := requestInfoFactory.NewRequestInfo(testCase.request) + if err != nil { + t.Fatalf("unexpected error from requestInfo creation: %#v", err) + } + + count := watchTracker.GetInterestedWatchCount(requestInfo) + if count != testCase.expected { + t.Errorf("unexpected interested watch count: %d, expected %d", count, testCase.expected) + } + }) + } + +}