diff --git a/build/visible_to/BUILD b/build/visible_to/BUILD index ff06d6aea7a..e850a3e040f 100644 --- a/build/visible_to/BUILD +++ b/build/visible_to/BUILD @@ -435,6 +435,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD index 6bc8be792a5..fb107f7abb0 100644 --- a/test/integration/apiserver/flowcontrol/BUILD +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -21,7 +21,6 @@ go_test( "//test/integration/framework:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 08ca660adc7..f6a30677549 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,9 +42,10 @@ import ( ) const ( - dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" - dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel" - timeout = time.Second * 10 + sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" + labelPriorityLevel = "priorityLevel" + timeout = time.Second * 10 ) func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { @@ -57,8 +57,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1", }) - masterConfig.GenericConfig.MaxRequestsInFlight = 5 - masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5 + masterConfig.GenericConfig.MaxRequestsInFlight = 1 + masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1 masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig _, s, closeFn := framework.RunAMaster(masterConfig) @@ -81,33 +81,59 @@ func TestPriorityLevelIsolation(t *testing.T) { priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu1", concurrencyShares, queueLength) - require.NoError(t, err) + if err != nil { + t.Error(err) + } priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu2", concurrencyShares, queueLength) - require.NoError(t, err) + if err != nil { + t.Error(err) + } + + sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + + if 1 != sharedConcurrency[priorityLevelNoxu1.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1) + } + if 1 != sharedConcurrency[priorityLevelNoxu2.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1) + } stopCh := make(chan struct{}) defer close(stopCh) + // "elephant" streamRequests(concurrencyShares+queueLength, func() { _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) + if err != nil { + t.Error(err) + } }, stopCh) // "mouse" - streamRequests(1, func() { + streamRequests(3, func() { _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) + if err != nil { + t.Error(err) + } }, stopCh) time.Sleep(time.Second * 10) // running in background for a while reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name] noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name] - if (noxu1RequestCount / 2) > noxu2RequestCount { - t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount) + // Theoretically, the actual expected value of request counts upon the two priority-level should be + // the equal. We're deliberately lax to make flakes super rare. + if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount { + t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount) } } @@ -123,12 +149,51 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf return clientset.NewForConfigOrDie(config) } -func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { +func getMetrics(c clientset.Interface) (string, error) { resp, err := c.CoreV1(). RESTClient(). Get(). RequestURI("/metrics"). DoRaw(context.Background()) + if err != nil { + return "", err + } + return string(resp), err +} + +func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + concurrency := make(map[string]int) + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return concurrency, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case sharedConcurrencyMetricsName: + concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) if err != nil { return nil, err } @@ -152,7 +217,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, erro for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { case dispatchedRequestCountMetricsName: - reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value) + reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) } } }