mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
assert shared concurrency
This commit is contained in:
parent
875407a450
commit
df5dfb46b7
@ -435,6 +435,7 @@ package_group(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
||||||
"//staging/src/k8s.io/component-base/metrics/...",
|
"//staging/src/k8s.io/component-base/metrics/...",
|
||||||
"//test/e2e_node",
|
"//test/e2e_node",
|
||||||
|
"//test/integration/apiserver/flowcontrol",
|
||||||
"//vendor/...",
|
"//vendor/...",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -21,7 +21,6 @@ go_test(
|
|||||||
"//test/integration/framework:go_default_library",
|
"//test/integration/framework:go_default_library",
|
||||||
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
|
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
|
||||||
"//vendor/github.com/prometheus/common/model:go_default_library",
|
"//vendor/github.com/prometheus/common/model:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
|
|
||||||
"github.com/prometheus/common/expfmt"
|
"github.com/prometheus/common/expfmt"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
|
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -43,9 +42,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
|
sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
|
||||||
dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel"
|
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
|
||||||
timeout = time.Second * 10
|
labelPriorityLevel = "priorityLevel"
|
||||||
|
timeout = time.Second * 10
|
||||||
)
|
)
|
||||||
|
|
||||||
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
||||||
@ -57,8 +57,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
|||||||
Group: "flowcontrol.apiserver.k8s.io",
|
Group: "flowcontrol.apiserver.k8s.io",
|
||||||
Version: "v1alpha1",
|
Version: "v1alpha1",
|
||||||
})
|
})
|
||||||
masterConfig.GenericConfig.MaxRequestsInFlight = 5
|
masterConfig.GenericConfig.MaxRequestsInFlight = 1
|
||||||
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5
|
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1
|
||||||
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
|
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
|
||||||
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
|
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
|
||||||
_, s, closeFn := framework.RunAMaster(masterConfig)
|
_, s, closeFn := framework.RunAMaster(masterConfig)
|
||||||
@ -81,33 +81,59 @@ func TestPriorityLevelIsolation(t *testing.T) {
|
|||||||
|
|
||||||
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
||||||
loopbackClient, "noxu1", concurrencyShares, queueLength)
|
loopbackClient, "noxu1", concurrencyShares, queueLength)
|
||||||
require.NoError(t, err)
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
||||||
loopbackClient, "noxu2", concurrencyShares, queueLength)
|
loopbackClient, "noxu2", concurrencyShares, queueLength)
|
||||||
require.NoError(t, err)
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if 1 != sharedConcurrency[priorityLevelNoxu1.Name] {
|
||||||
|
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1)
|
||||||
|
}
|
||||||
|
if 1 != sharedConcurrency[priorityLevelNoxu2.Name] {
|
||||||
|
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1)
|
||||||
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
// "elephant"
|
// "elephant"
|
||||||
streamRequests(concurrencyShares+queueLength, func() {
|
streamRequests(concurrencyShares+queueLength, func() {
|
||||||
_, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
_, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
||||||
require.NoError(t, err)
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
}, stopCh)
|
}, stopCh)
|
||||||
// "mouse"
|
// "mouse"
|
||||||
streamRequests(1, func() {
|
streamRequests(3, func() {
|
||||||
_, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
_, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
||||||
require.NoError(t, err)
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
}, stopCh)
|
}, stopCh)
|
||||||
|
|
||||||
time.Sleep(time.Second * 10) // running in background for a while
|
time.Sleep(time.Second * 10) // running in background for a while
|
||||||
|
|
||||||
reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
|
reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name]
|
noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name]
|
||||||
noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name]
|
noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name]
|
||||||
|
|
||||||
if (noxu1RequestCount / 2) > noxu2RequestCount {
|
// Theoretically, the actual expected value of request counts upon the two priority-level should be
|
||||||
t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
|
// the equal. We're deliberately lax to make flakes super rare.
|
||||||
|
if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
|
||||||
|
t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,12 +149,51 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf
|
|||||||
return clientset.NewForConfigOrDie(config)
|
return clientset.NewForConfigOrDie(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
|
func getMetrics(c clientset.Interface) (string, error) {
|
||||||
resp, err := c.CoreV1().
|
resp, err := c.CoreV1().
|
||||||
RESTClient().
|
RESTClient().
|
||||||
Get().
|
Get().
|
||||||
RequestURI("/metrics").
|
RequestURI("/metrics").
|
||||||
DoRaw(context.Background())
|
DoRaw(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(resp), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
|
||||||
|
resp, err := getMetrics(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
|
||||||
|
decoder := expfmt.SampleDecoder{
|
||||||
|
Dec: dec,
|
||||||
|
Opts: &expfmt.DecodeOptions{},
|
||||||
|
}
|
||||||
|
|
||||||
|
concurrency := make(map[string]int)
|
||||||
|
for {
|
||||||
|
var v model.Vector
|
||||||
|
if err := decoder.Decode(&v); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
// Expected loop termination condition.
|
||||||
|
return concurrency, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed decoding metrics: %v", err)
|
||||||
|
}
|
||||||
|
for _, metric := range v {
|
||||||
|
switch name := string(metric.Metric[model.MetricNameLabel]); name {
|
||||||
|
case sharedConcurrencyMetricsName:
|
||||||
|
concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
|
||||||
|
resp, err := getMetrics(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -152,7 +217,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, erro
|
|||||||
for _, metric := range v {
|
for _, metric := range v {
|
||||||
switch name := string(metric.Metric[model.MetricNameLabel]); name {
|
switch name := string(metric.Metric[model.MetricNameLabel]); name {
|
||||||
case dispatchedRequestCountMetricsName:
|
case dispatchedRequestCountMetricsName:
|
||||||
reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value)
|
reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user