mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
APF e2e: wait for steady state before proceeding
Signed-off-by: Adhityaa Chandrasekar <adtac@google.com>
This commit is contained in:
parent
334b426a6b
commit
1ae5ed9f6f
@ -75,6 +75,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/discovery:go_default_library",
|
"//staging/src/k8s.io/client-go/discovery:go_default_library",
|
||||||
|
@ -19,6 +19,7 @@ package apimachinery
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -32,6 +33,9 @@ import (
|
|||||||
|
|
||||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/util/apihelpers"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
|
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
@ -39,7 +43,11 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
|
||||||
requestConcurrencyLimitMetricLabelName = "priority_level"
|
priorityLevelLabelName = "priority_level"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = SIGDescribe("API priority and fairness", func() {
|
var _ = SIGDescribe("API priority and fairness", func() {
|
||||||
@ -59,6 +67,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
|
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
|
ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
|
||||||
|
waitForSteadyState(f, testingFlowSchemaName, testingPriorityLevelName)
|
||||||
|
|
||||||
var response *http.Response
|
var response *http.Response
|
||||||
ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
|
ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
|
||||||
response = makeRequest(f, matchingUsername)
|
response = makeRequest(f, matchingUsername)
|
||||||
@ -126,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
|
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
|
||||||
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
|
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
|
ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
|
||||||
|
waitForSteadyState(f, clients[i].flowSchemaName, clients[i].priorityLevelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
ginkgo.By("getting request concurrency from metrics")
|
ginkgo.By("getting request concurrency from metrics")
|
||||||
for i := range clients {
|
for i := range clients {
|
||||||
realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName)
|
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||||
if clients[i].concurrency < 1 {
|
if clients[i].concurrency < 1 {
|
||||||
clients[i].concurrency = 1
|
clients[i].concurrency = 1
|
||||||
@ -185,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
|
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
|
ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
|
||||||
|
waitForSteadyState(f, flowSchemaName, priorityLevelName)
|
||||||
|
|
||||||
type client struct {
|
type client struct {
|
||||||
username string
|
username string
|
||||||
qps float64
|
qps float64
|
||||||
@ -199,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
framework.Logf("getting real concurrency")
|
framework.Logf("getting real concurrency")
|
||||||
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
|
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
for i := range clients {
|
for i := range clients {
|
||||||
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
|
||||||
if clients[i].concurrency < 1 {
|
if clients[i].concurrency < 1 {
|
||||||
@ -259,10 +278,11 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//lint:ignore U1000 function is actually referenced
|
func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
|
||||||
func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 {
|
resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
||||||
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
|
if err != nil {
|
||||||
framework.ExpectNoError(err)
|
return 0, err
|
||||||
|
}
|
||||||
sampleDecoder := expfmt.SampleDecoder{
|
sampleDecoder := expfmt.SampleDecoder{
|
||||||
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
|
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
|
||||||
Opts: &expfmt.DecodeOptions{},
|
Opts: &expfmt.DecodeOptions{},
|
||||||
@ -270,22 +290,23 @@ func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName strin
|
|||||||
for {
|
for {
|
||||||
var v model.Vector
|
var v model.Vector
|
||||||
err := sampleDecoder.Decode(&v)
|
err := sampleDecoder.Decode(&v)
|
||||||
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
framework.ExpectNoError(err)
|
return 0, err
|
||||||
|
}
|
||||||
for _, metric := range v {
|
for _, metric := range v {
|
||||||
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
|
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName {
|
if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return int32(metric.Value)
|
return int32(metric.Value), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName))
|
return 0, errPriorityLevelNotFound
|
||||||
return 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// createFlowSchema creates a flow schema referring to a particular priority
|
// createFlowSchema creates a flow schema referring to a particular priority
|
||||||
@ -335,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForSteadyState repeatedly polls the API server to check if the newly
|
||||||
|
// created flow schema and priority level have been seen by the APF controller
|
||||||
|
// by checking: (1) the dangling priority level reference condition in the flow
|
||||||
|
// schema status, and (2) metrics. The function times out after 30 seconds.
|
||||||
|
func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityLevelName string) {
|
||||||
|
framework.ExpectNoError(wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
|
||||||
|
fs, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Get(context.TODO(), flowSchemaName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
|
||||||
|
if condition == nil || condition.Status != flowcontrol.ConditionFalse {
|
||||||
|
// The absence of the dangling status object implies that the APF
|
||||||
|
// controller isn't done with syncing the flow schema object. And, of
|
||||||
|
// course, the condition being anything but false means that steady state
|
||||||
|
// hasn't been achieved.
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
_, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
|
||||||
|
if err != nil {
|
||||||
|
if err == errPriorityLevelNotFound {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
// makeRequests creates a request to the API server and returns the response.
|
// makeRequests creates a request to the API server and returns the response.
|
||||||
func makeRequest(f *framework.Framework, username string) *http.Response {
|
func makeRequest(f *framework.Framework, username string) *http.Response {
|
||||||
config := f.ClientConfig()
|
config := f.ClientConfig()
|
||||||
|
Loading…
Reference in New Issue
Block a user