Merge pull request #103481 from wojtek-t/pf_watch_tracker

Add watch tracker to APF for request cost estimation
This commit is contained in:
Kubernetes Prow Robot 2021-07-07 10:44:06 -07:00 committed by GitHub
commit b93cd81609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 433 additions and 9 deletions

View File

@ -114,6 +114,7 @@ func WithPriorityAndFairness(
}
}
var resultCh chan interface{}
var forgetWatch utilflowcontrol.ForgetWatchFunc
if isWatchRequest {
resultCh = make(chan interface{})
}
@ -133,6 +134,8 @@ func WithPriorityAndFairness(
}
setResponseHeaders(classification, w)
forgetWatch = fcIfc.RegisterWatch(requestInfo)
if isWatchRequest {
go func() {
defer func() {
@ -163,6 +166,8 @@ func WithPriorityAndFairness(
}
// find the estimated "width" of the request
// TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch?
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
width := widthEstimator.EstimateWidth(r)
digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width}
@ -194,6 +199,9 @@ func WithPriorityAndFairness(
// 1) finished being processed or
// 2) rejected
if isWatchRequest {
if forgetWatch != nil {
forgetWatch()
}
err := <-resultCh
if err != nil {
panic(err)

View File

@ -76,6 +76,8 @@ type fakeApfFilter struct {
mockDecision mockDecision
postEnqueue func()
postDequeue func()
utilflowcontrol.WatchTracker
}
func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
@ -147,6 +149,7 @@ func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postE
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
WatchTracker: utilflowcontrol.NewWatchTracker(),
}
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
}
@ -352,6 +355,15 @@ type fakeWatchApfFilter struct {
lock sync.Mutex
inflight int
capacity int
utilflowcontrol.WatchTracker
}
func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
return &fakeWatchApfFilter{
capacity: capacity,
WatchTracker: utilflowcontrol.NewWatchTracker(),
}
}
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
@ -432,9 +444,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
allRunning := sync.WaitGroup{}
allRunning.Add(2 * concurrentRequests)
fakeFilter := &fakeWatchApfFilter{
capacity: concurrentRequests,
}
fakeFilter := newFakeWatchApfFilter(concurrentRequests)
onExecuteFunc := func() {
firstRunning.Done()
@ -479,9 +489,7 @@ func TestApfExecuteWatchRequestsWithInitializationSignal(t *testing.T) {
}
func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 0,
}
fakeFilter := newFakeWatchApfFilter(0)
onExecuteFunc := func() {
t.Errorf("Request unexepectedly executing")
@ -497,9 +505,7 @@ func TestApfRejectWatchRequestsWithInitializationSignal(t *testing.T) {
}
func TestApfWatchPanic(t *testing.T) {
fakeFilter := &fakeWatchApfFilter{
capacity: 1,
}
fakeFilter := newFakeWatchApfFilter(1)
onExecuteFunc := func() {
panic("test panic")

View File

@ -145,6 +145,9 @@ type configController struct {
// to a given FlowSchema in any minute.
// This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt
// watchTracker implements the necessary WatchTracker interface.
WatchTracker
}
type updateAttempt struct {
@ -192,6 +195,7 @@ func newTestableController(config TestableConfig) *configController {
requestWaitLimit: config.RequestWaitLimit,
flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
WatchTracker: NewWatchTracker(),
}
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between

View File

@ -66,6 +66,9 @@ type Interface interface {
// Install installs debugging endpoints to the web-server.
Install(c *mux.PathRecorderMux)
// WatchTracker provides the WatchTracker interface.
WatchTracker
}
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md

View File

@ -0,0 +1,148 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)
// readOnlyVerbs contains verbs for read-only requests.
var readOnlyVerbs = sets.NewString("get", "list", "watch", "proxy")
// watchIdentifier identifies group of watches that are similar.
// As described in the "Priority and Fairness" KEP, we consider
// watches similar if they have the same resourceType, namespace
// and name. We ignore selectors as they have to be evaluated
// when processing an even anyway.
//
// TODO: For now we only track the number of watches registered
// in our kube-apiserver. Eventually we should consider sharing
// this information with other kube-apiserver as described in the
// KEP, but this isn't part of the first version.
type watchIdentifier struct {
apiGroup string
resource string
namespace string
name string
}
// ForgetWatchFunc is a function that should be called to forget
// the previously registered watch from the watch tracker.
type ForgetWatchFunc func()
// WatchTracker is an interface that allows tracking the number
// of watches in the system for the purpose of estimating the
// cost of incoming mutating requests.
type WatchTracker interface {
// RegisterWatch reqisters a watch with the provided requestInfo
// in the tracker. It returns the function that should be called
// to forget the watcher once it is finished.
RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc
// GetInterestedWatchCount returns the number of watches that are
// potentially interested in a request with a given RequestInfo
// for the purpose of estimating cost of that request.
GetInterestedWatchCount(requestInfo *request.RequestInfo) int
}
// watchTracker tracks the number of watches in the system for
// the purpose of estimating the cost of incoming mutating requests.
type watchTracker struct {
lock sync.Mutex
watchCount map[watchIdentifier]int
}
func NewWatchTracker() WatchTracker {
return &watchTracker{
watchCount: make(map[watchIdentifier]int),
}
}
// RegisterWatch implements WatchTracker interface.
func (w *watchTracker) RegisterWatch(requestInfo *request.RequestInfo) ForgetWatchFunc {
if requestInfo == nil || requestInfo.Verb != "watch" {
return nil
}
identifier := &watchIdentifier{
apiGroup: requestInfo.APIGroup,
resource: requestInfo.Resource,
namespace: requestInfo.Namespace,
name: requestInfo.Name,
}
w.lock.Lock()
defer w.lock.Unlock()
w.watchCount[*identifier]++
return w.forgetWatch(identifier)
}
func (w *watchTracker) forgetWatch(identifier *watchIdentifier) ForgetWatchFunc {
return func() {
w.lock.Lock()
defer w.lock.Unlock()
w.watchCount[*identifier]--
if w.watchCount[*identifier] == 0 {
delete(w.watchCount, *identifier)
}
}
}
// GetInterestedWatchCount implements WatchTracker interface.
//
// TODO(wojtek-t): As of now, requestInfo for object creation (POST) doesn't
// contain the Name field set. Figure out if we can somehow get it for the
// more accurate cost estimation.
//
// TODO(wojtek-t): Figure out how to approach DELETECOLLECTION calls.
func (w *watchTracker) GetInterestedWatchCount(requestInfo *request.RequestInfo) int {
if requestInfo == nil || readOnlyVerbs.Has(requestInfo.Verb) {
return 0
}
result := 0
// The watches that we're interested in include:
// - watches for all objects of a resource type (no namespace and name specified)
// - watches for all objects of a resource type in the same namespace (no name specified)
// - watched interested in this particular object
identifier := &watchIdentifier{
apiGroup: requestInfo.APIGroup,
resource: requestInfo.Resource,
}
w.lock.Lock()
defer w.lock.Unlock()
result += w.watchCount[*identifier]
if requestInfo.Namespace != "" {
identifier.namespace = requestInfo.Namespace
result += w.watchCount[*identifier]
}
if requestInfo.Name != "" {
identifier.name = requestInfo.Name
result += w.watchCount[*identifier]
}
return result
}

View File

@ -0,0 +1,255 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"net/http"
"net/url"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
)
func httpRequest(method, path, rawQuery string) *http.Request {
return &http.Request{
Method: method,
URL: &url.URL{
Path: path,
RawQuery: rawQuery,
},
}
}
func newWatchIdentifier(apiGroup, resource, namespace, name string) *watchIdentifier {
return &watchIdentifier{
apiGroup: apiGroup,
resource: resource,
namespace: namespace,
name: name,
}
}
func TestRegisterWatch(t *testing.T) {
testCases := []struct {
name string
request *http.Request
expected *watchIdentifier
}{
{
name: "watch all objects",
request: httpRequest("GET", "/api/v1/pods", "watch=true"),
expected: newWatchIdentifier("", "pods", "", ""),
},
{
name: "list all objects",
request: httpRequest("GET", "/api/v1/pods", ""),
expected: nil,
},
{
name: "watch namespace-scoped objects",
request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true"),
expected: newWatchIdentifier("", "pods", "foo", ""),
},
{
name: "watch single object",
request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"),
expected: newWatchIdentifier("", "pods", "foo", "mypod"),
},
{
name: "watch single cluster-scoped object",
request: httpRequest("GET", "/api/v1/namespaces", "watch=true&fieldSelector=metadata.name=myns"),
expected: newWatchIdentifier("", "namespaces", "", "myns"),
},
{
name: "watch all objects from api-group",
request: httpRequest("GET", "/apis/group/v1/pods", "watch=true"),
expected: newWatchIdentifier("group", "pods", "", ""),
},
{
name: "watch namespace-scoped objects",
request: httpRequest("GET", "/apis/group/v1/namespaces/foo/pods", "watch=true"),
expected: newWatchIdentifier("group", "pods", "foo", ""),
},
{
name: "watch single object",
request: httpRequest("GET", "/apis/group/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"),
expected: newWatchIdentifier("group", "pods", "foo", "mypod"),
},
}
requestInfoFactory := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
watchTracker := &watchTracker{
watchCount: make(map[watchIdentifier]int),
}
requestInfo, err := requestInfoFactory.NewRequestInfo(testCase.request)
if err != nil {
t.Fatalf("unexpected error from requestInfo creation: %#v", err)
}
forget := watchTracker.RegisterWatch(requestInfo)
if testCase.expected == nil {
if forget != nil {
t.Errorf("unexpected watch registered: %#v", watchTracker.watchCount)
}
return
}
if forget == nil {
t.Errorf("watch should be registered, got: %v", forget)
return
}
if count := watchTracker.watchCount[*testCase.expected]; count != 1 {
t.Errorf("unexpected watch registered: %#v", watchTracker.watchCount)
}
forget()
if count := watchTracker.watchCount[*testCase.expected]; count != 0 {
t.Errorf("forget should unregister the watch: %#v", watchTracker.watchCount)
}
})
}
}
func TestGetInterestedWatchCount(t *testing.T) {
watchTracker := NewWatchTracker()
registeredWatches := []*http.Request{
httpRequest("GET", "api/v1/pods", "watch=true"),
httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true"),
httpRequest("GET", "api/v1/namespaces/foo/pods", "watch=true&fieldSelector=metadata.name=mypod"),
httpRequest("GET", "api/v1/namespaces/bar/pods", "watch=true&fieldSelector=metadata.name=mypod"),
httpRequest("GET", "apis/group/v1/namespaces/foo/pods", "watch=true"),
httpRequest("GET", "apis/group/v1/namespaces/bar/pods", "watch=true&fieldSelector=metadata.name=mypod"),
}
requestInfoFactory := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
for _, req := range registeredWatches {
requestInfo, err := requestInfoFactory.NewRequestInfo(req)
if err != nil {
t.Fatalf("unexpected error from requestInfo creation: %#v", err)
}
if forget := watchTracker.RegisterWatch(requestInfo); forget == nil {
t.Errorf("watch wasn't registered: %#v", requestInfo)
}
}
testCases := []struct {
name string
request *http.Request
expected int
}{
{
name: "pod creation in foo namespace",
request: httpRequest("POST", "/api/v1/namespaces/foo/pods", ""),
expected: 2,
},
{
name: "mypod update in foo namespace",
request: httpRequest("PUT", "/api/v1/namespaces/foo/pods/mypod", ""),
expected: 3,
},
{
name: "mypod patch in foo namespace",
request: httpRequest("PATCH", "/api/v1/namespaces/foo/pods/mypod", ""),
expected: 3,
},
{
name: "mypod deletion in foo namespace",
request: httpRequest("DELETE", "/api/v1/namespaces/foo/pods/mypod", ""),
expected: 3,
},
{
name: "otherpod update in foo namespace",
request: httpRequest("PUT", "/api/v1/namespaces/foo/pods/otherpod", ""),
expected: 2,
},
{
name: "mypod get in foo namespace",
request: httpRequest("GET", "/api/v1/namespaces/foo/pods/mypod", ""),
expected: 0,
},
{
name: "pods list in foo namespace",
request: httpRequest("GET", "/api/v1/namespaces/foo/pods", ""),
expected: 0,
},
{
name: "pods watch in foo namespace",
request: httpRequest("GET", "/api/v1/namespaces/foo/pods", "watch=true"),
expected: 0,
},
{
name: "pods proxy in foo namespace",
request: httpRequest("GET", "/api/v1/proxy/namespaces/foo/pods/mypod", ""),
expected: 0,
},
{
name: "pod creation in bar namespace",
request: httpRequest("POST", "/api/v1/namespaces/bar/pods", ""),
expected: 1,
},
{
name: "mypod update in bar namespace",
request: httpRequest("PUT", "/api/v1/namespaces/bar/pods/mypod", ""),
expected: 2,
},
{
name: "mypod update in foo namespace in group group",
request: httpRequest("PUT", "/apis/group/v1/namespaces/foo/pods/mypod", ""),
expected: 1,
},
{
name: "otherpod update in foo namespace in group group",
request: httpRequest("PUT", "/apis/group/v1/namespaces/foo/pods/otherpod", ""),
expected: 1,
},
{
name: "mypod update in var namespace in group group",
request: httpRequest("PUT", "/apis/group/v1/namespaces/bar/pods/mypod", ""),
expected: 1,
},
{
name: "otherpod update in bar namespace in group group",
request: httpRequest("PUT", "/apis/group/v1/namespaces/bar/pods/otherpod", ""),
expected: 0,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
requestInfo, err := requestInfoFactory.NewRequestInfo(testCase.request)
if err != nil {
t.Fatalf("unexpected error from requestInfo creation: %#v", err)
}
count := watchTracker.GetInterestedWatchCount(requestInfo)
if count != testCase.expected {
t.Errorf("unexpected interested watch count: %d, expected %d", count, testCase.expected)
}
})
}
}