mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
APF e2e tests: add request drown-out fairness test
Signed-off-by: Adhityaa Chandrasekar <adtac@google.com>
This commit is contained in:
parent
e827708635
commit
16fc690d3a
@ -71,6 +71,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
|
||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
@ -67,7 +68,12 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should ensure that requests can't be drowned out", func() {
|
||||
// This test creates two flow schemas and a corresponding priority level for
|
||||
// each flow schema. One flow schema has a higher match precedence. With two
|
||||
// clients making requests at different rates, we test to make sure that the
|
||||
// higher QPS client cannot drown out the other one despite having higher
|
||||
// priority.
|
||||
ginkgo.It("should ensure that requests can't be drowned out (priority)", func() {
|
||||
flowSchemaNamePrefix := "e2e-testing-flowschema"
|
||||
priorityLevelNamePrefix := "e2e-testing-prioritylevel"
|
||||
loadDuration := 10 * time.Second
|
||||
@ -137,6 +143,70 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// This test has two clients (different usernames) making requests at
|
||||
// different rates. Both clients' requests get mapped to the same flow schema
|
||||
// and priority level. We expect APF's "ByUser" flow distinguisher to isolate
|
||||
// the two clients and not allow one client to drown out the other despite
|
||||
// having a higher QPS.
|
||||
ginkgo.It("should ensure that requests can't be drowned out (fairness)", func() {
|
||||
priorityLevelName := "e2e-testing-prioritylevel"
|
||||
flowSchemaName := "e2e-testing-flowschema"
|
||||
loadDuration := 10 * time.Second
|
||||
|
||||
framework.Logf("creating PriorityLevel %q", priorityLevelName)
|
||||
_, cleanup := createPriorityLevel(f, priorityLevelName, 1)
|
||||
defer cleanup()
|
||||
|
||||
framework.Logf("creating FlowSchema %q", flowSchemaName)
|
||||
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, "*")
|
||||
defer cleanup()
|
||||
|
||||
type client struct {
|
||||
username string
|
||||
qps float64
|
||||
concurrencyMultiplier float64
|
||||
concurrency int32
|
||||
completedRequests int32
|
||||
}
|
||||
clients := []client{
|
||||
{username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0},
|
||||
{username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5},
|
||||
}
|
||||
|
||||
framework.Logf("getting real concurrency")
|
||||
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
|
||||
for i := range clients {
|
||||
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||
if clients[i].concurrency < 1 {
|
||||
clients[i].concurrency = 1
|
||||
}
|
||||
framework.Logf("request concurrency for %q will be %d", clients[i].username, clients[i].concurrency)
|
||||
}
|
||||
|
||||
ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
|
||||
var wg sync.WaitGroup
|
||||
for i := range clients {
|
||||
wg.Add(1)
|
||||
go func(c *client) {
|
||||
defer wg.Done()
|
||||
framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
|
||||
c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
|
||||
}(&clients[i])
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
ginkgo.By("checking completed requests with expected values")
|
||||
for _, client := range clients {
|
||||
// Each client should have 95% of its ideal number of completed requests.
|
||||
maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
|
||||
fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
|
||||
framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
|
||||
if fractionCompleted < 0.95 {
|
||||
framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// createPriorityLevel creates a priority level with the provided assured
|
||||
@ -196,6 +266,23 @@ func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName strin
|
||||
// createFlowSchema creates a flow schema referring to a particular priority
|
||||
// level and matching the username provided.
|
||||
func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsername string) (*flowcontrol.FlowSchema, func()) {
|
||||
var subjects []flowcontrol.Subject
|
||||
if matchingUsername == "*" {
|
||||
subjects = append(subjects, flowcontrol.Subject{
|
||||
Kind: flowcontrol.SubjectKindGroup,
|
||||
Group: &flowcontrol.GroupSubject{
|
||||
Name: user.AllAuthenticated,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
subjects = append(subjects, flowcontrol.Subject{
|
||||
Kind: flowcontrol.SubjectKindUser,
|
||||
User: &flowcontrol.UserSubject{
|
||||
Name: matchingUsername,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
createdFlowSchema, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Create(
|
||||
context.TODO(),
|
||||
&flowcontrol.FlowSchema{
|
||||
@ -212,14 +299,7 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
|
||||
},
|
||||
Rules: []flowcontrol.PolicyRulesWithSubjects{
|
||||
{
|
||||
Subjects: []flowcontrol.Subject{
|
||||
{
|
||||
Kind: flowcontrol.SubjectKindUser,
|
||||
User: &flowcontrol.UserSubject{
|
||||
Name: matchingUsername,
|
||||
},
|
||||
},
|
||||
},
|
||||
Subjects: subjects,
|
||||
NonResourceRules: []flowcontrol.NonResourcePolicyRule{
|
||||
{
|
||||
Verbs: []string{flowcontrol.VerbAll},
|
||||
@ -239,8 +319,9 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
|
||||
|
||||
// makeRequests creates a request to the API server and returns the response.
|
||||
func makeRequest(f *framework.Framework, username string) *http.Response {
|
||||
config := rest.CopyConfig(f.ClientConfig())
|
||||
config := f.ClientConfig()
|
||||
config.Impersonate.UserName = username
|
||||
config.Impersonate.Groups = []string{"system:authenticated"}
|
||||
roundTripper, err := rest.TransportFor(config)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
@ -313,5 +394,5 @@ func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurren
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return atomic.LoadInt32(&completed)
|
||||
return completed
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user