diff --git a/build/visible_to/BUILD b/build/visible_to/BUILD index dd05b846c2b..a6951e4d86b 100644 --- a/build/visible_to/BUILD +++ b/build/visible_to/BUILD @@ -435,6 +435,7 @@ package_group( "//pkg/volume/util/operationexecutor", "//staging/src/k8s.io/apiserver/pkg/admission/metrics", "//staging/src/k8s.io/component-base/metrics/...", + "//test/e2e/apimachinery", "//test/e2e_node", "//test/integration/apiserver/flowcontrol", "//test/integration/metrics", diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index b43da348371..361adf3db30 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -103,6 +103,8 @@ go_library( "//vendor/github.com/go-openapi/spec:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/github.com/prometheus/common/expfmt:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", diff --git a/test/e2e/apimachinery/flowcontrol.go b/test/e2e/apimachinery/flowcontrol.go index a0b9a7b2aa4..2ed7d5ae8df 100644 --- a/test/e2e/apimachinery/flowcontrol.go +++ b/test/e2e/apimachinery/flowcontrol.go @@ -17,18 +17,31 @@ limitations under the License. package apimachinery import ( + "bytes" "context" + "fmt" + "io" "net/http" + "sync" + "sync/atomic" + "time" "github.com/onsi/ginkgo" - "k8s.io/client-go/rest" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" "k8s.io/kubernetes/test/e2e/framework" ) -var _ = SIGDescribe("[Feature:APIPriorityAndFairness] response header should present", func() { +const ( + requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit" + requestConcurrencyLimitMetricLabelName = "priorityLevel" +) + +var _ = SIGDescribe("[Feature:APIPriorityAndFairness]", func() { f := framework.NewDefaultFramework("flowschemas") ginkgo.It("should ensure that requests can be classified by testing flow-schemas/priority-levels", func() { @@ -113,11 +126,158 @@ var _ = SIGDescribe("[Feature:APIPriorityAndFairness] response header should pre } }) + ginkgo.It("should ensure that requests can't be drowned out", func() { + flowSchemaNamePrefix := "e2e-testing-flowschema" + priorityLevelNamePrefix := "e2e-testing-prioritylevel" + loadDuration := 10 * time.Second + type client struct { + username string + qps float64 + priorityLevelName string + concurrencyMultiplier float64 + concurrency int32 + flowSchemaName string + matchingPrecedence int32 + completedRequests int32 + } + clients := []client{ + // "elephant" refers to a client that creates requests at a much higher + // QPS than its counter-part and well above its concurrency share limit. + // In contrast, the mouse stays under its concurrency shares. + // Additionally, the "elephant" client also has a higher matching + // precedence for its flow schema. + {username: "elephant", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999}, + {username: "mouse", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000}, + } + + ginkgo.By("creating test priority levels and flow schemas") + for i := range clients { + clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username) + framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName) + _, err := f.ClientSet.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create( + context.TODO(), + &flowcontrolv1alpha1.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: clients[i].priorityLevelName, + }, + Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: 1, + LimitResponse: flowcontrolv1alpha1.LimitResponse{ + Type: flowcontrolv1alpha1.LimitResponseTypeReject, + }, + }, + }, + }, + metav1.CreateOptions{}) + framework.ExpectNoError(err) + defer func(name string) { + framework.ExpectNoError(f.ClientSet.FlowcontrolV1alpha1().PriorityLevelConfigurations().Delete(context.TODO(), name, metav1.DeleteOptions{})) + }(clients[i].priorityLevelName) + clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username) + framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName) + _, err = f.ClientSet.FlowcontrolV1alpha1().FlowSchemas().Create( + context.TODO(), + &flowcontrolv1alpha1.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: clients[i].flowSchemaName, + }, + Spec: flowcontrolv1alpha1.FlowSchemaSpec{ + MatchingPrecedence: clients[i].matchingPrecedence, + PriorityLevelConfiguration: flowcontrolv1alpha1.PriorityLevelConfigurationReference{ + Name: clients[i].priorityLevelName, + }, + DistinguisherMethod: &flowcontrolv1alpha1.FlowDistinguisherMethod{ + Type: flowcontrolv1alpha1.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrolv1alpha1.PolicyRulesWithSubjects{ + { + Subjects: []flowcontrolv1alpha1.Subject{ + { + Kind: flowcontrolv1alpha1.SubjectKindUser, + User: &flowcontrolv1alpha1.UserSubject{ + Name: clients[i].username, + }, + }, + }, + NonResourceRules: []flowcontrolv1alpha1.NonResourcePolicyRule{ + { + Verbs: []string{flowcontrolv1alpha1.VerbAll}, + NonResourceURLs: []string{flowcontrolv1alpha1.NonResourceAll}, + }, + }, + }, + }, + }, + }, + metav1.CreateOptions{}) + framework.ExpectNoError(err) + defer func(name string) { + framework.ExpectNoError(f.ClientSet.FlowcontrolV1alpha1().FlowSchemas().Delete(context.TODO(), name, metav1.DeleteOptions{})) + }(clients[i].flowSchemaName) + } + + ginkgo.By("getting request concurrency from metrics") + for i := range clients { + resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) + framework.ExpectNoError(err) + sampleDecoder := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText), + Opts: &expfmt.DecodeOptions{}, + } + for { + var v model.Vector + err := sampleDecoder.Decode(&v) + if err == io.EOF { + break + } + framework.ExpectNoError(err) + for _, metric := range v { + if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName { + continue + } + if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != clients[i].priorityLevelName { + continue + } + clients[i].concurrency = int32(float64(metric.Value) * clients[i].concurrencyMultiplier) + if clients[i].concurrency < 1 { + clients[i].concurrency = 1 + } + framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, int32(metric.Value)) + } + } + } + + ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String())) + var wg sync.WaitGroup + for i := range clients { + wg.Add(1) + go func(c *client) { + defer wg.Done() + framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps) + c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration) + }(&clients[i]) + } + wg.Wait() + + ginkgo.By("checking completed requests with expected values") + for _, client := range clients { + // Each client should have 95% of its ideal number of completed requests. + maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second) + fractionCompleted := float64(client.completedRequests) / maxCompletedRequests + framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted) + if fractionCompleted < 0.95 { + framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted) + } + } + }) }) -func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID, fsUID string) bool { +// makeRequests creates a request to the API server and returns the response. +func makeRequest(f *framework.Framework, username string) *http.Response { config := rest.CopyConfig(f.ClientConfig()) - config.Impersonate.UserName = impersonatingUser + config.Impersonate.UserName = username roundTripper, err := rest.TransportFor(config) framework.ExpectNoError(err) @@ -126,7 +286,11 @@ func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID, response, err := roundTripper.RoundTrip(req) framework.ExpectNoError(err) + return response +} +func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID, fsUID string) bool { + response := makeRequest(f, impersonatingUser) if response.Header.Get(flowcontrolv1alpha1.ResponseHeaderMatchedFlowSchemaUID) != fsUID { return false } @@ -135,3 +299,56 @@ func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID, } return true } + +// uniformQPSLoadSingle loads the API server with requests at a uniform +// for time. The number of successfully completed requests is +// returned. +func uniformQPSLoadSingle(f *framework.Framework, username string, qps float64, loadDuration time.Duration) int32 { + var completed int32 + var wg sync.WaitGroup + ticker := time.NewTicker(time.Duration(1e9/qps) * time.Nanosecond) + defer ticker.Stop() + timer := time.NewTimer(loadDuration) + for { + select { + case <-ticker.C: + wg.Add(1) + // Each request will have a non-zero latency. In addition, there may be + // multiple concurrent requests in-flight. As a result, a request may + // take longer than the time between two different consecutive ticks + // regardless of whether a requests is accepted or rejected. For example, + // in cases with clients making requests far above their concurrency + // share, with little time between consecutive requests, due to limited + // concurrency, newer requests will be enqueued until older ones + // complete. Hence the synchronisation with sync.WaitGroup. + go func() { + defer wg.Done() + makeRequest(f, username) + atomic.AddInt32(&completed, 1) + }() + case <-timer.C: + // Still in-flight requests should not contribute to the completed count. + totalCompleted := atomic.LoadInt32(&completed) + wg.Wait() // do not leak goroutines + return totalCompleted + } + } +} + +// uniformQPSLoadConcurrent loads the API server with a number of +// clients impersonating to be , each creating requests at a uniform +// rate defined by . The sum of number of successfully completed requests +// across all concurrent clients is returned. +func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurrency int32, qps float64, loadDuration time.Duration) int32 { + var completed int32 + var wg sync.WaitGroup + wg.Add(int(concurrency)) + for i := int32(0); i < concurrency; i++ { + go func() { + defer wg.Done() + atomic.AddInt32(&completed, uniformQPSLoadSingle(f, username, qps, loadDuration)) + }() + } + wg.Wait() + return atomic.LoadInt32(&completed) +}