mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #96984 from adtac/apfe2e-3
APF e2e test: wait for steady state before proceeding
This commit is contained in:
commit
ba5f5bea64
@ -77,6 +77,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/discovery:go_default_library",
|
||||
|
@ -19,6 +19,7 @@ package apimachinery
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -32,14 +33,21 @@ import (
|
||||
|
||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/util/apihelpers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
||||
requestConcurrencyLimitMetricLabelName = "priority_level"
|
||||
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
||||
priorityLevelLabelName = "priority_level"
|
||||
)
|
||||
|
||||
var (
|
||||
errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
|
||||
)
|
||||
|
||||
var _ = SIGDescribe("API priority and fairness", func() {
|
||||
@ -59,6 +67,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
|
||||
defer cleanup()
|
||||
|
||||
ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
|
||||
waitForSteadyState(f, testingFlowSchemaName, testingPriorityLevelName)
|
||||
|
||||
var response *http.Response
|
||||
ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
|
||||
response = makeRequest(f, matchingUsername)
|
||||
@ -126,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
|
||||
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
|
||||
defer cleanup()
|
||||
|
||||
ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
|
||||
waitForSteadyState(f, clients[i].flowSchemaName, clients[i].priorityLevelName)
|
||||
}
|
||||
|
||||
ginkgo.By("getting request concurrency from metrics")
|
||||
for i := range clients {
|
||||
realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName)
|
||||
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName)
|
||||
framework.ExpectNoError(err)
|
||||
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||
if clients[i].concurrency < 1 {
|
||||
clients[i].concurrency = 1
|
||||
@ -185,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
|
||||
defer cleanup()
|
||||
|
||||
ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
|
||||
waitForSteadyState(f, flowSchemaName, priorityLevelName)
|
||||
|
||||
type client struct {
|
||||
username string
|
||||
qps float64
|
||||
@ -199,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
||||
}
|
||||
|
||||
framework.Logf("getting real concurrency")
|
||||
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
|
||||
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
|
||||
framework.ExpectNoError(err)
|
||||
for i := range clients {
|
||||
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||
if clients[i].concurrency < 1 {
|
||||
@ -259,10 +278,11 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur
|
||||
}
|
||||
}
|
||||
|
||||
//lint:ignore U1000 function is actually referenced
|
||||
func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 {
|
||||
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
||||
framework.ExpectNoError(err)
|
||||
func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
|
||||
resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
sampleDecoder := expfmt.SampleDecoder{
|
||||
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
|
||||
Opts: &expfmt.DecodeOptions{},
|
||||
@ -270,22 +290,23 @@ func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName strin
|
||||
for {
|
||||
var v model.Vector
|
||||
err := sampleDecoder.Decode(&v)
|
||||
if err == io.EOF {
|
||||
break
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
framework.ExpectNoError(err)
|
||||
for _, metric := range v {
|
||||
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
|
||||
continue
|
||||
}
|
||||
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName {
|
||||
if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
|
||||
continue
|
||||
}
|
||||
return int32(metric.Value)
|
||||
return int32(metric.Value), nil
|
||||
}
|
||||
}
|
||||
framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName))
|
||||
return 0
|
||||
return 0, errPriorityLevelNotFound
|
||||
}
|
||||
|
||||
// createFlowSchema creates a flow schema referring to a particular priority
|
||||
@ -335,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
|
||||
}
|
||||
}
|
||||
|
||||
// waitForSteadyState repeatedly polls the API server to check if the newly
|
||||
// created flow schema and priority level have been seen by the APF controller
|
||||
// by checking: (1) the dangling priority level reference condition in the flow
|
||||
// schema status, and (2) metrics. The function times out after 30 seconds.
|
||||
func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityLevelName string) {
|
||||
framework.ExpectNoError(wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
|
||||
fs, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Get(context.TODO(), flowSchemaName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
|
||||
if condition == nil || condition.Status != flowcontrol.ConditionFalse {
|
||||
// The absence of the dangling status object implies that the APF
|
||||
// controller isn't done with syncing the flow schema object. And, of
|
||||
// course, the condition being anything but false means that steady state
|
||||
// hasn't been achieved.
|
||||
return false, nil
|
||||
}
|
||||
_, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
|
||||
if err != nil {
|
||||
if err == errPriorityLevelNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}))
|
||||
}
|
||||
|
||||
// makeRequests creates a request to the API server and returns the response.
|
||||
func makeRequest(f *framework.Framework, username string) *http.Response {
|
||||
config := f.ClientConfig()
|
||||
|
Loading…
Reference in New Issue
Block a user