From 12746f4bc15458d585ffd4c6e9d6066810e27361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 28 Oct 2021 12:41:41 +0200 Subject: [PATCH 1/2] P&F: Update WatchTracker interface to pass more information --- .../pkg/server/filters/priority-and-fairness.go | 2 +- .../apiserver/pkg/util/flowcontrol/watch_tracker.go | 10 ++++++---- .../pkg/util/flowcontrol/watch_tracker_test.go | 8 ++++++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index fd5a3ab9d1a..8c6d2d12c6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -186,7 +186,7 @@ func WithPriorityAndFairness( served = true setResponseHeaders(classification, w) - forgetWatch = fcIfc.RegisterWatch(requestInfo) + forgetWatch = fcIfc.RegisterWatch(r) // Notify the main thread that we're ready to start the watch. close(shouldStartWatchCh) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go index 4dc7c131eb0..18f89950676 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go @@ -17,6 +17,7 @@ limitations under the License. package flowcontrol import ( + "net/http" "sync" "k8s.io/apimachinery/pkg/util/sets" @@ -51,10 +52,10 @@ type ForgetWatchFunc func() // of watches in the system for the purpose of estimating the // cost of incoming mutating requests. type WatchTracker interface { - // RegisterWatch reqisters a watch with the provided requestInfo + // RegisterWatch reqisters a watch based on the provided http.Request // in the tracker. It returns the function that should be called // to forget the watcher once it is finished. - RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc + RegisterWatch(r *http.Request) ForgetWatchFunc // GetInterestedWatchCount returns the number of watches that are // potentially interested in a request with a given RequestInfo @@ -77,8 +78,9 @@ func NewWatchTracker() WatchTracker { } // RegisterWatch implements WatchTracker interface. -func (w *watchTracker) RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc { - if requestInfo == nil || requestInfo.Verb != "watch" { +func (w *watchTracker) RegisterWatch(r *http.Request) ForgetWatchFunc { + requestInfo, ok := request.RequestInfoFrom(r.Context()) + if !ok || requestInfo == nil || requestInfo.Verb != "watch" { return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go index fafccbe4255..3448b65b0f2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go @@ -17,6 +17,7 @@ limitations under the License. package flowcontrol import ( + "context" "net/http" "net/url" "testing" @@ -107,8 +108,10 @@ func TestRegisterWatch(t *testing.T) { if err != nil { t.Fatalf("unexpected error from requestInfo creation: %#v", err) } + ctx := request.WithRequestInfo(context.Background(), requestInfo) + r := testCase.request.WithContext(ctx) - forget := watchTracker.RegisterWatch(requestInfo) + forget := watchTracker.RegisterWatch(r) if testCase.expected == nil { if forget != nil { t.Errorf("unexpected watch registered: %#v", watchTracker.watchCount) @@ -151,7 +154,8 @@ func TestGetInterestedWatchCount(t *testing.T) { if err != nil { t.Fatalf("unexpected error from requestInfo creation: %#v", err) } - if forget := watchTracker.RegisterWatch(requestInfo); forget == nil { + r := req.WithContext(request.WithRequestInfo(context.Background(), requestInfo)) + if forget := watchTracker.RegisterWatch(r); forget == nil { t.Errorf("watch wasn't registered: %#v", requestInfo) } } From 21ec77dc19047215094a538b717e24035ca000ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 28 Oct 2021 14:57:47 +0200 Subject: [PATCH 2/2] Enable support for indexes in WatchTracker --- .../pkg/util/flowcontrol/watch_tracker.go | 93 ++++++++++++++++++- .../util/flowcontrol/watch_tracker_test.go | 70 ++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go index 18f89950676..5c64c702c1b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker.go @@ -20,8 +20,13 @@ import ( "net/http" "sync" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/request" + + "k8s.io/klog/v2" ) // readOnlyVerbs contains verbs for read-only requests. @@ -63,20 +68,66 @@ type WatchTracker interface { GetInterestedWatchCount(requestInfo *request.RequestInfo) int } +// builtinIndexes represents of set of indexes registered in +// watchcache that are indexing watches and increase speed of +// their processing. +// We define the indexes as a map from a resource to the path +// to the field in the object on which the index is built. +type builtinIndexes map[string]string + +func getBuiltinIndexes() builtinIndexes { + // The only existing indexes as of now are: + // - spec.nodeName for pods + // - metadata.Name for nodes, secrets and configmaps + // However, we can ignore the latter, because the requestInfo.Name + // is set for them (i.e. we already catch them correctly). + return map[string]string{ + "pods": "spec.nodeName", + } +} + // watchTracker tracks the number of watches in the system for // the purpose of estimating the cost of incoming mutating requests. type watchTracker struct { - lock sync.Mutex + // indexes represents a set of registered indexes. + // It can't change after creation. + indexes builtinIndexes + lock sync.Mutex watchCount map[watchIdentifier]int } func NewWatchTracker() WatchTracker { return &watchTracker{ + indexes: getBuiltinIndexes(), watchCount: make(map[watchIdentifier]int), } } +const ( + unsetValue = "" +) + +func getIndexValue(r *http.Request, field string) string { + opts := metainternalversion.ListOptions{} + if err := scheme.ParameterCodec.DecodeParameters(r.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { + klog.Warningf("Couldn't parse list options for %v: %v", r.URL.Query(), err) + return unsetValue + } + if opts.FieldSelector == nil { + return unsetValue + } + if value, ok := opts.FieldSelector.RequiresExactMatch(field); ok { + return value + } + return unsetValue +} + +type indexValue struct { + resource string + value string +} + // RegisterWatch implements WatchTracker interface. func (w *watchTracker) RegisterWatch(r *http.Request) ForgetWatchFunc { requestInfo, ok := request.RequestInfoFrom(r.Context()) @@ -84,6 +135,14 @@ func (w *watchTracker) RegisterWatch(r *http.Request) ForgetWatchFunc { return nil } + var index *indexValue + if indexField, ok := w.indexes[requestInfo.Resource]; ok { + index = &indexValue{ + resource: requestInfo.Resource, + value: getIndexValue(r, indexField), + } + } + identifier := &watchIdentifier{ apiGroup: requestInfo.APIGroup, resource: requestInfo.Resource, @@ -93,16 +152,40 @@ func (w *watchTracker) RegisterWatch(r *http.Request) ForgetWatchFunc { w.lock.Lock() defer w.lock.Unlock() - w.watchCount[*identifier]++ - return w.forgetWatch(identifier) + w.updateIndexLocked(identifier, index, 1) + return w.forgetWatch(identifier, index) } -func (w *watchTracker) forgetWatch(identifier *watchIdentifier) ForgetWatchFunc { +func (w *watchTracker) updateIndexLocked(identifier *watchIdentifier, index *indexValue, incr int) { + if index == nil { + w.watchCount[*identifier] += incr + } else { + // For resources with defined index, for a given watch event we are + // only processing the watchers that: + // (a) do not specify field selector for an index field + // (b) do specify field selector with the value equal to the value + // coming from the processed object + // + // TODO(wojtek-t): For the sake of making progress and initially + // simplifying the implementation, we approximate (b) for all values + // as the value for an empty string. The assumption we're making here + // is that the difference between the actual number of watchers that + // will be processed, i.e. (a)+(b) above and the one from our + // approximation i.e. (a)+[(b) for field value of ""] will be small. + // This seem to be true in almost all production clusters, which makes + // it a reasonable first step simplification to unblock progres on it. + if index.value == unsetValue || index.value == "" { + w.watchCount[*identifier]++ + } + } +} + +func (w *watchTracker) forgetWatch(identifier *watchIdentifier, index *indexValue) ForgetWatchFunc { return func() { w.lock.Lock() defer w.lock.Unlock() - w.watchCount[*identifier]-- + w.updateIndexLocked(identifier, index, -1) if w.watchCount[*identifier] == 0 { delete(w.watchCount, *identifier) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go index 3448b65b0f2..0c5ba72544c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/watch_tracker_test.go @@ -257,3 +257,73 @@ func TestGetInterestedWatchCount(t *testing.T) { } } + +func TestGetInterestedWatchCountWithIndex(t *testing.T) { + watchTracker := NewWatchTracker() + + registeredWatches := []*http.Request{ + httpRequest("GET", "api/v1/pods", "watch=true"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=spec.nodeName"), + // The watches below will be ignored due to index. + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=spec.nodeName=node1"), + httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=spec.nodeName=node2"), + } + requestInfoFactory := &request.RequestInfoFactory{ + APIPrefixes: sets.NewString("api", "apis"), + GrouplessAPIPrefixes: sets.NewString("api"), + } + for _, req := range registeredWatches { + requestInfo, err := requestInfoFactory.NewRequestInfo(req) + if err != nil { + t.Fatalf("unexpected error from requestInfo creation: %#v", err) + } + r := req.WithContext(request.WithRequestInfo(context.Background(), requestInfo)) + if forget := watchTracker.RegisterWatch(r); forget == nil { + t.Errorf("watch wasn't registered: %#v", requestInfo) + } + } + + testCases := []struct { + name string + request *http.Request + expected int + }{ + { + name: "pod creation in foo namespace", + request: httpRequest("POST", "/api/v1/namespaces/foo/pods", ""), + expected: 3, + }, + { + name: "mypod update in foo namespace", + request: httpRequest("PUT", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 4, + }, + { + name: "mypod patch in foo namespace", + request: httpRequest("PATCH", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 4, + }, + { + name: "mypod deletion in foo namespace", + request: httpRequest("DELETE", "/api/v1/namespaces/foo/pods/mypod", ""), + expected: 4, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + requestInfo, err := requestInfoFactory.NewRequestInfo(testCase.request) + if err != nil { + t.Fatalf("unexpected error from requestInfo creation: %#v", err) + } + + count := watchTracker.GetInterestedWatchCount(requestInfo) + if count != testCase.expected { + t.Errorf("unexpected interested watch count: %d, expected %d", count, testCase.expected) + } + }) + } + +}