diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 91421fe07e8..600410f8bb8 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -72,6 +72,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library", diff --git a/test/e2e/apimachinery/flowcontrol.go b/test/e2e/apimachinery/flowcontrol.go index 1fb0340973b..2e514b51911 100644 --- a/test/e2e/apimachinery/flowcontrol.go +++ b/test/e2e/apimachinery/flowcontrol.go @@ -32,13 +32,14 @@ import ( flowcontrol "k8s.io/api/flowcontrol/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/authentication/user" "k8s.io/client-go/rest" "k8s.io/kubernetes/test/e2e/framework" ) const ( requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit" - requestConcurrencyLimitMetricLabelName = "priorityLevel" + requestConcurrencyLimitMetricLabelName = "priority_level" ) var _ = SIGDescribe("API priority and fairness", func() { @@ -51,73 +52,14 @@ var _ = SIGDescribe("API priority and fairness", func() { nonMatchingUsername := "foo" ginkgo.By("creating a testing prioritylevel") - createdPriorityLevel, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create( - context.TODO(), - &flowcontrol.PriorityLevelConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: testingPriorityLevelName, - }, - Spec: flowcontrol.PriorityLevelConfigurationSpec{ - Type: flowcontrol.PriorityLevelEnablementLimited, - Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 1, // will have at minimum 1 concurrency share - LimitResponse: flowcontrol.LimitResponse{ - Type: flowcontrol.LimitResponseTypeReject, - }, - }, - }, - }, - metav1.CreateOptions{}) - framework.ExpectNoError(err) - - defer func() { - // clean-ups - err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), testingPriorityLevelName, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - err = f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), testingFlowSchemaName, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - }() + createdPriorityLevel, cleanup := createPriorityLevel(f, testingPriorityLevelName, 1) + defer cleanup() ginkgo.By("creating a testing flowschema") - createdFlowSchema, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create( - context.TODO(), - &flowcontrol.FlowSchema{ - ObjectMeta: metav1.ObjectMeta{ - Name: testingFlowSchemaName, - }, - Spec: flowcontrol.FlowSchemaSpec{ - MatchingPrecedence: 1000, // a rather higher precedence to ensure it make effect - PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ - Name: testingPriorityLevelName, - }, - DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ - Type: flowcontrol.FlowDistinguisherMethodByUserType, - }, - Rules: []flowcontrol.PolicyRulesWithSubjects{ - { - Subjects: []flowcontrol.Subject{ - { - Kind: flowcontrol.SubjectKindUser, - User: &flowcontrol.UserSubject{ - Name: matchingUsername, - }, - }, - }, - NonResourceRules: []flowcontrol.NonResourcePolicyRule{ - { - Verbs: []string{flowcontrol.VerbAll}, - NonResourceURLs: []string{flowcontrol.NonResourceAll}, - }, - }, - }, - }, - }, - }, - metav1.CreateOptions{}) - framework.ExpectNoError(err) - - ginkgo.By("response headers should contain flow-schema/priority-level uid") + createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, matchingUsername) + defer cleanup() + ginkgo.By("checking response headers contain flow-schema/priority-level uid") if !testResponseHeaderMatches(f, matchingUsername, string(createdPriorityLevel.UID), string(createdFlowSchema.UID)) { framework.Failf("matching user doesnt received UID for the testing priority-level and flow-schema") } @@ -126,7 +68,12 @@ var _ = SIGDescribe("API priority and fairness", func() { } }) - ginkgo.It("should ensure that requests can't be drowned out", func() { + // This test creates two flow schemas and a corresponding priority level for + // each flow schema. One flow schema has a higher match precedence. With two + // clients making requests at different rates, we test to make sure that the + // higher QPS client cannot drown out the other one despite having higher + // priority. + ginkgo.It("should ensure that requests can't be drowned out (priority)", func() { flowSchemaNamePrefix := "e2e-testing-flowschema" priorityLevelNamePrefix := "e2e-testing-prioritylevel" loadDuration := 10 * time.Second @@ -141,113 +88,101 @@ var _ = SIGDescribe("API priority and fairness", func() { completedRequests int32 } clients := []client{ - // "elephant" refers to a client that creates requests at a much higher + // "highqps" refers to a client that creates requests at a much higher // QPS than its counter-part and well above its concurrency share limit. - // In contrast, the mouse stays under its concurrency shares. - // Additionally, the "elephant" client also has a higher matching + // In contrast, "lowqps" stays under its concurrency shares. + // Additionally, the "highqps" client also has a higher matching // precedence for its flow schema. - {username: "elephant", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999}, - {username: "mouse", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000}, + {username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999}, + {username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000}, } ginkgo.By("creating test priority levels and flow schemas") for i := range clients { clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username) framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName) - _, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create( - context.TODO(), - &flowcontrol.PriorityLevelConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: clients[i].priorityLevelName, - }, - Spec: flowcontrol.PriorityLevelConfigurationSpec{ - Type: flowcontrol.PriorityLevelEnablementLimited, - Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 1, - LimitResponse: flowcontrol.LimitResponse{ - Type: flowcontrol.LimitResponseTypeReject, - }, - }, - }, - }, - metav1.CreateOptions{}) - framework.ExpectNoError(err) - defer func(name string) { - framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), name, metav1.DeleteOptions{})) - }(clients[i].priorityLevelName) + _, cleanup := createPriorityLevel(f, clients[i].priorityLevelName, 1) + defer cleanup() + clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username) framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName) - _, err = f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create( - context.TODO(), - &flowcontrol.FlowSchema{ - ObjectMeta: metav1.ObjectMeta{ - Name: clients[i].flowSchemaName, - }, - Spec: flowcontrol.FlowSchemaSpec{ - MatchingPrecedence: clients[i].matchingPrecedence, - PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ - Name: clients[i].priorityLevelName, - }, - DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ - Type: flowcontrol.FlowDistinguisherMethodByUserType, - }, - Rules: []flowcontrol.PolicyRulesWithSubjects{ - { - Subjects: []flowcontrol.Subject{ - { - Kind: flowcontrol.SubjectKindUser, - User: &flowcontrol.UserSubject{ - Name: clients[i].username, - }, - }, - }, - NonResourceRules: []flowcontrol.NonResourcePolicyRule{ - { - Verbs: []string{flowcontrol.VerbAll}, - NonResourceURLs: []string{flowcontrol.NonResourceAll}, - }, - }, - }, - }, - }, - }, - metav1.CreateOptions{}) - framework.ExpectNoError(err) - defer func(name string) { - framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), name, metav1.DeleteOptions{})) - }(clients[i].flowSchemaName) + _, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, clients[i].username) + defer cleanup() } ginkgo.By("getting request concurrency from metrics") for i := range clients { - resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) - framework.ExpectNoError(err) - sampleDecoder := expfmt.SampleDecoder{ - Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText), - Opts: &expfmt.DecodeOptions{}, + realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName) + clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) + if clients[i].concurrency < 1 { + clients[i].concurrency = 1 } - for { - var v model.Vector - err := sampleDecoder.Decode(&v) - if err == io.EOF { - break - } - framework.ExpectNoError(err) - for _, metric := range v { - if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName { - continue - } - if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != clients[i].priorityLevelName { - continue - } - clients[i].concurrency = int32(float64(metric.Value) * clients[i].concurrencyMultiplier) - if clients[i].concurrency < 1 { - clients[i].concurrency = 1 - } - framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, int32(metric.Value)) - } + framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, realConcurrency) + } + + ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String())) + var wg sync.WaitGroup + for i := range clients { + wg.Add(1) + go func(c *client) { + defer wg.Done() + framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps) + c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration) + }(&clients[i]) + } + wg.Wait() + + ginkgo.By("checking completed requests with expected values") + for _, client := range clients { + // Each client should have 95% of its ideal number of completed requests. + maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second) + fractionCompleted := float64(client.completedRequests) / maxCompletedRequests + framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted) + if fractionCompleted < 0.95 { + framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted) } } + }) + + // This test has two clients (different usernames) making requests at + // different rates. Both clients' requests get mapped to the same flow schema + // and priority level. We expect APF's "ByUser" flow distinguisher to isolate + // the two clients and not allow one client to drown out the other despite + // having a higher QPS. + ginkgo.It("should ensure that requests can't be drowned out (fairness)", func() { + priorityLevelName := "e2e-testing-prioritylevel" + flowSchemaName := "e2e-testing-flowschema" + loadDuration := 10 * time.Second + + framework.Logf("creating PriorityLevel %q", priorityLevelName) + _, cleanup := createPriorityLevel(f, priorityLevelName, 1) + defer cleanup() + + framework.Logf("creating FlowSchema %q", flowSchemaName) + _, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, "*") + defer cleanup() + + type client struct { + username string + qps float64 + concurrencyMultiplier float64 + concurrency int32 + completedRequests int32 + } + clients := []client{ + {username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0}, + {username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5}, + } + + framework.Logf("getting real concurrency") + realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName) + for i := range clients { + clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) + if clients[i].concurrency < 1 { + clients[i].concurrency = 1 + } + framework.Logf("request concurrency for %q will be %d", clients[i].username, clients[i].concurrency) + } ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String())) var wg sync.WaitGroup @@ -274,10 +209,119 @@ var _ = SIGDescribe("API priority and fairness", func() { }) }) +// createPriorityLevel creates a priority level with the provided assured +// concurrency share. +func createPriorityLevel(f *framework.Framework, priorityLevelName string, assuredConcurrencyShares int32) (*flowcontrol.PriorityLevelConfiguration, func()) { + createdPriorityLevel, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create( + context.TODO(), + &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: priorityLevelName, + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: assuredConcurrencyShares, + LimitResponse: flowcontrol.LimitResponse{ + Type: flowcontrol.LimitResponseTypeReject, + }, + }, + }, + }, + metav1.CreateOptions{}) + framework.ExpectNoError(err) + return createdPriorityLevel, func() { + framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), priorityLevelName, metav1.DeleteOptions{})) + } +} + +func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 { + resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) + framework.ExpectNoError(err) + sampleDecoder := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText), + Opts: &expfmt.DecodeOptions{}, + } + for { + var v model.Vector + err := sampleDecoder.Decode(&v) + if err == io.EOF { + break + } + framework.ExpectNoError(err) + for _, metric := range v { + if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName { + continue + } + if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName { + continue + } + return int32(metric.Value) + } + } + framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName)) + return 0 +} + +// createFlowSchema creates a flow schema referring to a particular priority +// level and matching the username provided. +func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsername string) (*flowcontrol.FlowSchema, func()) { + var subjects []flowcontrol.Subject + if matchingUsername == "*" { + subjects = append(subjects, flowcontrol.Subject{ + Kind: flowcontrol.SubjectKindGroup, + Group: &flowcontrol.GroupSubject{ + Name: user.AllAuthenticated, + }, + }) + } else { + subjects = append(subjects, flowcontrol.Subject{ + Kind: flowcontrol.SubjectKindUser, + User: &flowcontrol.UserSubject{ + Name: matchingUsername, + }, + }) + } + + createdFlowSchema, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create( + context.TODO(), + &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: flowSchemaName, + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: matchingPrecedence, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: priorityLevelName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrol.PolicyRulesWithSubjects{ + { + Subjects: subjects, + NonResourceRules: []flowcontrol.NonResourcePolicyRule{ + { + Verbs: []string{flowcontrol.VerbAll}, + NonResourceURLs: []string{flowcontrol.NonResourceAll}, + }, + }, + }, + }, + }, + }, + metav1.CreateOptions{}) + framework.ExpectNoError(err) + return createdFlowSchema, func() { + framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), flowSchemaName, metav1.DeleteOptions{})) + } +} + // makeRequests creates a request to the API server and returns the response. func makeRequest(f *framework.Framework, username string) *http.Response { - config := rest.CopyConfig(f.ClientConfig()) + config := f.ClientConfig() config.Impersonate.UserName = username + config.Impersonate.Groups = []string{"system:authenticated"} roundTripper, err := rest.TransportFor(config) framework.ExpectNoError(err) @@ -350,5 +394,5 @@ func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurren }() } wg.Wait() - return atomic.LoadInt32(&completed) + return completed }