mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #96646 from adtac/apfe2e-2
APF e2e tests: add request drown-out fairness test
This commit is contained in:
commit
1c49b4425b
@ -72,6 +72,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||||
|
@ -32,13 +32,14 @@ import (
|
|||||||
|
|
||||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
||||||
requestConcurrencyLimitMetricLabelName = "priorityLevel"
|
requestConcurrencyLimitMetricLabelName = "priority_level"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = SIGDescribe("API priority and fairness", func() {
|
var _ = SIGDescribe("API priority and fairness", func() {
|
||||||
@ -51,73 +52,14 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
nonMatchingUsername := "foo"
|
nonMatchingUsername := "foo"
|
||||||
|
|
||||||
ginkgo.By("creating a testing prioritylevel")
|
ginkgo.By("creating a testing prioritylevel")
|
||||||
createdPriorityLevel, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(
|
createdPriorityLevel, cleanup := createPriorityLevel(f, testingPriorityLevelName, 1)
|
||||||
context.TODO(),
|
defer cleanup()
|
||||||
&flowcontrol.PriorityLevelConfiguration{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: testingPriorityLevelName,
|
|
||||||
},
|
|
||||||
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
|
||||||
Type: flowcontrol.PriorityLevelEnablementLimited,
|
|
||||||
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
|
||||||
AssuredConcurrencyShares: 1, // will have at minimum 1 concurrency share
|
|
||||||
LimitResponse: flowcontrol.LimitResponse{
|
|
||||||
Type: flowcontrol.LimitResponseTypeReject,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
metav1.CreateOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
// clean-ups
|
|
||||||
err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), testingPriorityLevelName, metav1.DeleteOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
err = f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), testingFlowSchemaName, metav1.DeleteOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ginkgo.By("creating a testing flowschema")
|
ginkgo.By("creating a testing flowschema")
|
||||||
createdFlowSchema, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create(
|
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, matchingUsername)
|
||||||
context.TODO(),
|
defer cleanup()
|
||||||
&flowcontrol.FlowSchema{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: testingFlowSchemaName,
|
|
||||||
},
|
|
||||||
Spec: flowcontrol.FlowSchemaSpec{
|
|
||||||
MatchingPrecedence: 1000, // a rather higher precedence to ensure it make effect
|
|
||||||
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
|
||||||
Name: testingPriorityLevelName,
|
|
||||||
},
|
|
||||||
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
|
||||||
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
|
||||||
},
|
|
||||||
Rules: []flowcontrol.PolicyRulesWithSubjects{
|
|
||||||
{
|
|
||||||
Subjects: []flowcontrol.Subject{
|
|
||||||
{
|
|
||||||
Kind: flowcontrol.SubjectKindUser,
|
|
||||||
User: &flowcontrol.UserSubject{
|
|
||||||
Name: matchingUsername,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
|
|
||||||
{
|
|
||||||
Verbs: []string{flowcontrol.VerbAll},
|
|
||||||
NonResourceURLs: []string{flowcontrol.NonResourceAll},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
metav1.CreateOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
|
|
||||||
ginkgo.By("response headers should contain flow-schema/priority-level uid")
|
|
||||||
|
|
||||||
|
ginkgo.By("checking response headers contain flow-schema/priority-level uid")
|
||||||
if !testResponseHeaderMatches(f, matchingUsername, string(createdPriorityLevel.UID), string(createdFlowSchema.UID)) {
|
if !testResponseHeaderMatches(f, matchingUsername, string(createdPriorityLevel.UID), string(createdFlowSchema.UID)) {
|
||||||
framework.Failf("matching user doesnt received UID for the testing priority-level and flow-schema")
|
framework.Failf("matching user doesnt received UID for the testing priority-level and flow-schema")
|
||||||
}
|
}
|
||||||
@ -126,7 +68,12 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
ginkgo.It("should ensure that requests can't be drowned out", func() {
|
// This test creates two flow schemas and a corresponding priority level for
|
||||||
|
// each flow schema. One flow schema has a higher match precedence. With two
|
||||||
|
// clients making requests at different rates, we test to make sure that the
|
||||||
|
// higher QPS client cannot drown out the other one despite having higher
|
||||||
|
// priority.
|
||||||
|
ginkgo.It("should ensure that requests can't be drowned out (priority)", func() {
|
||||||
flowSchemaNamePrefix := "e2e-testing-flowschema"
|
flowSchemaNamePrefix := "e2e-testing-flowschema"
|
||||||
priorityLevelNamePrefix := "e2e-testing-prioritylevel"
|
priorityLevelNamePrefix := "e2e-testing-prioritylevel"
|
||||||
loadDuration := 10 * time.Second
|
loadDuration := 10 * time.Second
|
||||||
@ -141,112 +88,100 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
completedRequests int32
|
completedRequests int32
|
||||||
}
|
}
|
||||||
clients := []client{
|
clients := []client{
|
||||||
// "elephant" refers to a client that creates requests at a much higher
|
// "highqps" refers to a client that creates requests at a much higher
|
||||||
// QPS than its counter-part and well above its concurrency share limit.
|
// QPS than its counter-part and well above its concurrency share limit.
|
||||||
// In contrast, the mouse stays under its concurrency shares.
|
// In contrast, "lowqps" stays under its concurrency shares.
|
||||||
// Additionally, the "elephant" client also has a higher matching
|
// Additionally, the "highqps" client also has a higher matching
|
||||||
// precedence for its flow schema.
|
// precedence for its flow schema.
|
||||||
{username: "elephant", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999},
|
{username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999},
|
||||||
{username: "mouse", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000},
|
{username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000},
|
||||||
}
|
}
|
||||||
|
|
||||||
ginkgo.By("creating test priority levels and flow schemas")
|
ginkgo.By("creating test priority levels and flow schemas")
|
||||||
for i := range clients {
|
for i := range clients {
|
||||||
clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username)
|
clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username)
|
||||||
framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName)
|
framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName)
|
||||||
_, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(
|
_, cleanup := createPriorityLevel(f, clients[i].priorityLevelName, 1)
|
||||||
context.TODO(),
|
defer cleanup()
|
||||||
&flowcontrol.PriorityLevelConfiguration{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: clients[i].priorityLevelName,
|
|
||||||
},
|
|
||||||
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
|
||||||
Type: flowcontrol.PriorityLevelEnablementLimited,
|
|
||||||
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
|
||||||
AssuredConcurrencyShares: 1,
|
|
||||||
LimitResponse: flowcontrol.LimitResponse{
|
|
||||||
Type: flowcontrol.LimitResponseTypeReject,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
metav1.CreateOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
defer func(name string) {
|
|
||||||
framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), name, metav1.DeleteOptions{}))
|
|
||||||
}(clients[i].priorityLevelName)
|
|
||||||
clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username)
|
clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username)
|
||||||
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
|
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
|
||||||
_, err = f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create(
|
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, clients[i].username)
|
||||||
context.TODO(),
|
defer cleanup()
|
||||||
&flowcontrol.FlowSchema{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: clients[i].flowSchemaName,
|
|
||||||
},
|
|
||||||
Spec: flowcontrol.FlowSchemaSpec{
|
|
||||||
MatchingPrecedence: clients[i].matchingPrecedence,
|
|
||||||
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
|
||||||
Name: clients[i].priorityLevelName,
|
|
||||||
},
|
|
||||||
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
|
||||||
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
|
||||||
},
|
|
||||||
Rules: []flowcontrol.PolicyRulesWithSubjects{
|
|
||||||
{
|
|
||||||
Subjects: []flowcontrol.Subject{
|
|
||||||
{
|
|
||||||
Kind: flowcontrol.SubjectKindUser,
|
|
||||||
User: &flowcontrol.UserSubject{
|
|
||||||
Name: clients[i].username,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
|
|
||||||
{
|
|
||||||
Verbs: []string{flowcontrol.VerbAll},
|
|
||||||
NonResourceURLs: []string{flowcontrol.NonResourceAll},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
metav1.CreateOptions{})
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
defer func(name string) {
|
|
||||||
framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), name, metav1.DeleteOptions{}))
|
|
||||||
}(clients[i].flowSchemaName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ginkgo.By("getting request concurrency from metrics")
|
ginkgo.By("getting request concurrency from metrics")
|
||||||
for i := range clients {
|
for i := range clients {
|
||||||
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName)
|
||||||
framework.ExpectNoError(err)
|
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||||
sampleDecoder := expfmt.SampleDecoder{
|
|
||||||
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
|
|
||||||
Opts: &expfmt.DecodeOptions{},
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
var v model.Vector
|
|
||||||
err := sampleDecoder.Decode(&v)
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
for _, metric := range v {
|
|
||||||
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != clients[i].priorityLevelName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
clients[i].concurrency = int32(float64(metric.Value) * clients[i].concurrencyMultiplier)
|
|
||||||
if clients[i].concurrency < 1 {
|
if clients[i].concurrency < 1 {
|
||||||
clients[i].concurrency = 1
|
clients[i].concurrency = 1
|
||||||
}
|
}
|
||||||
framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, int32(metric.Value))
|
framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, realConcurrency)
|
||||||
|
}
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := range clients {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(c *client) {
|
||||||
|
defer wg.Done()
|
||||||
|
framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
|
||||||
|
c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
|
||||||
|
}(&clients[i])
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
ginkgo.By("checking completed requests with expected values")
|
||||||
|
for _, client := range clients {
|
||||||
|
// Each client should have 95% of its ideal number of completed requests.
|
||||||
|
maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
|
||||||
|
fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
|
||||||
|
framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
|
||||||
|
if fractionCompleted < 0.95 {
|
||||||
|
framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// This test has two clients (different usernames) making requests at
|
||||||
|
// different rates. Both clients' requests get mapped to the same flow schema
|
||||||
|
// and priority level. We expect APF's "ByUser" flow distinguisher to isolate
|
||||||
|
// the two clients and not allow one client to drown out the other despite
|
||||||
|
// having a higher QPS.
|
||||||
|
ginkgo.It("should ensure that requests can't be drowned out (fairness)", func() {
|
||||||
|
priorityLevelName := "e2e-testing-prioritylevel"
|
||||||
|
flowSchemaName := "e2e-testing-flowschema"
|
||||||
|
loadDuration := 10 * time.Second
|
||||||
|
|
||||||
|
framework.Logf("creating PriorityLevel %q", priorityLevelName)
|
||||||
|
_, cleanup := createPriorityLevel(f, priorityLevelName, 1)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
framework.Logf("creating FlowSchema %q", flowSchemaName)
|
||||||
|
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, "*")
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
username string
|
||||||
|
qps float64
|
||||||
|
concurrencyMultiplier float64
|
||||||
|
concurrency int32
|
||||||
|
completedRequests int32
|
||||||
|
}
|
||||||
|
clients := []client{
|
||||||
|
{username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0},
|
||||||
|
{username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5},
|
||||||
|
}
|
||||||
|
|
||||||
|
framework.Logf("getting real concurrency")
|
||||||
|
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
|
||||||
|
for i := range clients {
|
||||||
|
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||||
|
if clients[i].concurrency < 1 {
|
||||||
|
clients[i].concurrency = 1
|
||||||
|
}
|
||||||
|
framework.Logf("request concurrency for %q will be %d", clients[i].username, clients[i].concurrency)
|
||||||
}
|
}
|
||||||
|
|
||||||
ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
|
ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
|
||||||
@ -274,10 +209,119 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// createPriorityLevel creates a priority level with the provided assured
|
||||||
|
// concurrency share.
|
||||||
|
func createPriorityLevel(f *framework.Framework, priorityLevelName string, assuredConcurrencyShares int32) (*flowcontrol.PriorityLevelConfiguration, func()) {
|
||||||
|
createdPriorityLevel, err := f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(
|
||||||
|
context.TODO(),
|
||||||
|
&flowcontrol.PriorityLevelConfiguration{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: priorityLevelName,
|
||||||
|
},
|
||||||
|
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
||||||
|
Type: flowcontrol.PriorityLevelEnablementLimited,
|
||||||
|
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
||||||
|
AssuredConcurrencyShares: assuredConcurrencyShares,
|
||||||
|
LimitResponse: flowcontrol.LimitResponse{
|
||||||
|
Type: flowcontrol.LimitResponseTypeReject,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
metav1.CreateOptions{})
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
return createdPriorityLevel, func() {
|
||||||
|
framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().PriorityLevelConfigurations().Delete(context.TODO(), priorityLevelName, metav1.DeleteOptions{}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 {
|
||||||
|
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
sampleDecoder := expfmt.SampleDecoder{
|
||||||
|
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
|
||||||
|
Opts: &expfmt.DecodeOptions{},
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
var v model.Vector
|
||||||
|
err := sampleDecoder.Decode(&v)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
for _, metric := range v {
|
||||||
|
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return int32(metric.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// createFlowSchema creates a flow schema referring to a particular priority
|
||||||
|
// level and matching the username provided.
|
||||||
|
func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsername string) (*flowcontrol.FlowSchema, func()) {
|
||||||
|
var subjects []flowcontrol.Subject
|
||||||
|
if matchingUsername == "*" {
|
||||||
|
subjects = append(subjects, flowcontrol.Subject{
|
||||||
|
Kind: flowcontrol.SubjectKindGroup,
|
||||||
|
Group: &flowcontrol.GroupSubject{
|
||||||
|
Name: user.AllAuthenticated,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
subjects = append(subjects, flowcontrol.Subject{
|
||||||
|
Kind: flowcontrol.SubjectKindUser,
|
||||||
|
User: &flowcontrol.UserSubject{
|
||||||
|
Name: matchingUsername,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
createdFlowSchema, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create(
|
||||||
|
context.TODO(),
|
||||||
|
&flowcontrol.FlowSchema{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: flowSchemaName,
|
||||||
|
},
|
||||||
|
Spec: flowcontrol.FlowSchemaSpec{
|
||||||
|
MatchingPrecedence: matchingPrecedence,
|
||||||
|
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
||||||
|
Name: priorityLevelName,
|
||||||
|
},
|
||||||
|
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
||||||
|
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
||||||
|
},
|
||||||
|
Rules: []flowcontrol.PolicyRulesWithSubjects{
|
||||||
|
{
|
||||||
|
Subjects: subjects,
|
||||||
|
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
|
||||||
|
{
|
||||||
|
Verbs: []string{flowcontrol.VerbAll},
|
||||||
|
NonResourceURLs: []string{flowcontrol.NonResourceAll},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
metav1.CreateOptions{})
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
return createdFlowSchema, func() {
|
||||||
|
framework.ExpectNoError(f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Delete(context.TODO(), flowSchemaName, metav1.DeleteOptions{}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// makeRequests creates a request to the API server and returns the response.
|
// makeRequests creates a request to the API server and returns the response.
|
||||||
func makeRequest(f *framework.Framework, username string) *http.Response {
|
func makeRequest(f *framework.Framework, username string) *http.Response {
|
||||||
config := rest.CopyConfig(f.ClientConfig())
|
config := f.ClientConfig()
|
||||||
config.Impersonate.UserName = username
|
config.Impersonate.UserName = username
|
||||||
|
config.Impersonate.Groups = []string{"system:authenticated"}
|
||||||
roundTripper, err := rest.TransportFor(config)
|
roundTripper, err := rest.TransportFor(config)
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
@ -350,5 +394,5 @@ func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurren
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return atomic.LoadInt32(&completed)
|
return completed
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user