From 30bc0fce486bd39ec21e746d361651598b4bea14 Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Thu, 19 Mar 2020 21:33:35 +0800 Subject: [PATCH 1/3] integration test for priority-level isolation --- test/integration/apiserver/BUILD | 1 + test/integration/apiserver/flowcontrol/BUILD | 41 +++ .../apiserver/flowcontrol/concurrency_test.go | 249 ++++++++++++++++++ .../apiserver/flowcontrol/main_test.go | 27 ++ test/integration/framework/BUILD | 3 + test/integration/framework/master_utils.go | 13 + 6 files changed, 334 insertions(+) create mode 100644 test/integration/apiserver/flowcontrol/BUILD create mode 100644 test/integration/apiserver/flowcontrol/concurrency_test.go create mode 100644 test/integration/apiserver/flowcontrol/main_test.go diff --git a/test/integration/apiserver/BUILD b/test/integration/apiserver/BUILD index 6a5f46fb8f2..4e96ba3f20b 100644 --- a/test/integration/apiserver/BUILD +++ b/test/integration/apiserver/BUILD @@ -90,6 +90,7 @@ filegroup( "//test/integration/apiserver/admissionwebhook:all-srcs", "//test/integration/apiserver/apply:all-srcs", "//test/integration/apiserver/certreload:all-srcs", + "//test/integration/apiserver/flowcontrol:all-srcs", "//test/integration/apiserver/podlogs:all-srcs", ], tags = ["automanaged"], diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD new file mode 100644 index 00000000000..bc8b2c2f3a4 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "concurrency_test.go", + "main_test.go", + ], + tags = ["integration"], + deps = [ + "//pkg/master:go_default_library", + "//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//test/integration/framework:go_default_library", + "//vendor/github.com/prometheus/common/expfmt:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go new file mode 100644 index 00000000000..3471d8a7eea --- /dev/null +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -0,0 +1,249 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "fmt" + "io" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" + dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel" + timeout = time.Second * 10 +) + +func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { + opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()} + opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts) + resourceConfig := master.DefaultAPIResourceConfigSource() + resourceConfig.EnableVersions(schema.GroupVersion{ + Group: "flowcontrol.apiserver.k8s.io", + Version: "v1alpha1", + }) + masterConfig.GenericConfig.MaxRequestsInFlight = 5 + masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5 + masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() + masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig + _, s, closeFn := framework.RunAMaster(masterConfig) + + return s, masterConfig.GenericConfig.LoopbackClientConfig, closeFn +} + +func TestPriorityLevelIsolation(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() + // NOTE: disabling the feature should fail the test + _, loopbackConfig, closeFn := setup(t) + defer closeFn() + + loopbackClient := clientset.NewForConfigOrDie(loopbackConfig) + noxu1Client := getClientFor(loopbackConfig, "noxu1") + noxu2Client := getClientFor(loopbackConfig, "noxu2") + + priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu1") + require.NoError(t, err) + priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu2") + require.NoError(t, err) + + wg := &sync.WaitGroup{} + // "elephant" + streamRequests(wg, 10, 100, func() { + _, err := noxu1Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + require.NoError(t, err) + }) + + streamRequests(nil, 1, 100, func() { + _, err := noxu2Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + require.NoError(t, err) + }) + + wg.Wait() + + dispatchedCountNoxu1, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu1.Name) + require.NoError(t, err) + dispatchedCountNoxu2, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu2.Name) + require.NoError(t, err) + + assert.Equal(t, 1000, dispatchedCountNoxu1) + assert.Equal(t, 100, dispatchedCountNoxu2) +} + +func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { + config := &rest.Config{ + Host: loopbackConfig.Host, + QPS: -1, + BearerToken: loopbackConfig.BearerToken, + Impersonate: rest.ImpersonationConfig{ + UserName: username, + }, + } + return clientset.NewForConfigOrDie(config) +} + +func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName string) (int, error) { + resp, err := c.CoreV1(). + RESTClient(). + Get(). + RequestURI("/metrics"). + DoRaw(context.TODO()) + if err != nil { + return 0, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return 0, fmt.Errorf("no dispatched-count metrics found for priorityLevel %v", priorityLevelName) + } + return 0, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case dispatchedRequestCountMetricsName: + if priorityLevelName == string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel]) { + return int(metric.Value), nil + } + } + } + } +} + +func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { + pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.TODO(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + }, + Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: 10, + LimitResponse: flowcontrolv1alpha1.LimitResponse{ + Type: flowcontrolv1alpha1.LimitResponseTypeQueue, + Queuing: &flowcontrolv1alpha1.QueuingConfiguration{ + Queues: 100, + HandSize: 1, + QueueLengthLimit: 10, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, nil, err + } + fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Create(context.TODO(), &flowcontrolv1alpha1.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + }, + Spec: flowcontrolv1alpha1.FlowSchemaSpec{ + DistinguisherMethod: &flowcontrolv1alpha1.FlowDistinguisherMethod{ + Type: flowcontrolv1alpha1.FlowDistinguisherMethodByUserType, + }, + MatchingPrecedence: 1000, + PriorityLevelConfiguration: flowcontrolv1alpha1.PriorityLevelConfigurationReference{ + Name: username, + }, + Rules: []flowcontrolv1alpha1.PolicyRulesWithSubjects{ + { + ResourceRules: []flowcontrolv1alpha1.ResourcePolicyRule{ + { + Verbs: []string{flowcontrolv1alpha1.VerbAll}, + APIGroups: []string{flowcontrolv1alpha1.APIGroupAll}, + Resources: []string{flowcontrolv1alpha1.ResourceAll}, + Namespaces: []string{flowcontrolv1alpha1.NamespaceEvery}, + ClusterScope: true, + }, + }, + Subjects: []flowcontrolv1alpha1.Subject{ + { + Kind: flowcontrolv1alpha1.SubjectKindUser, + User: &flowcontrolv1alpha1.UserSubject{ + Name: username, + }, + }, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, nil, err + } + + return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) { + fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, condition := range fs.Status.Conditions { + if condition.Type == flowcontrolv1alpha1.FlowSchemaConditionDangling { + if condition.Status == flowcontrolv1alpha1.ConditionFalse { + return true, nil + } + } + } + return false, nil + }) +} + +func streamRequests(wg *sync.WaitGroup, parallel, times int, request func()) { + for i := 0; i < parallel; i++ { + if wg != nil { + wg.Add(1) + } + go func() { + for j := 0; j < times; j++ { + request() + } + if wg != nil { + wg.Done() + } + }() + } +} diff --git a/test/integration/apiserver/flowcontrol/main_test.go b/test/integration/apiserver/flowcontrol/main_test.go new file mode 100644 index 00000000000..41cf0c86e8c --- /dev/null +++ b/test/integration/apiserver/flowcontrol/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/framework/BUILD b/test/integration/framework/BUILD index eea9f1b2292..8a9113ad3d7 100644 --- a/test/integration/framework/BUILD +++ b/test/integration/framework/BUILD @@ -62,10 +62,13 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 52127655ce5..bfa88015e2b 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -39,10 +39,13 @@ import ( "k8s.io/apiserver/pkg/authorization/authorizerfactory" authorizerunion "k8s.io/apiserver/pkg/authorization/union" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -191,6 +194,16 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv } masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) { + masterConfig.GenericConfig.FlowControl = utilflowcontrol.New( + masterConfig.ExtraConfig.VersionedInformers, + clientset.FlowcontrolV1alpha1(), + masterConfig.GenericConfig.MaxRequestsInFlight+masterConfig.GenericConfig.MaxMutatingRequestsInFlight, + masterConfig.GenericConfig.RequestTimeout/4, + ) + } + m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate()) if err != nil { // We log the error first so that even if closeFn crashes, the error is shown From 875407a450892355744441cb550cc13d16a1cacc Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Fri, 20 Mar 2020 01:46:19 +0800 Subject: [PATCH 2/3] add flowcontrol integration test to import whitelist --- .../src/k8s.io/component-base/metrics/BUILD | 1 + test/integration/apiserver/flowcontrol/BUILD | 1 - .../apiserver/flowcontrol/concurrency_test.go | 83 ++++++++++--------- 3 files changed, 44 insertions(+), 41 deletions(-) diff --git a/staging/src/k8s.io/component-base/metrics/BUILD b/staging/src/k8s.io/component-base/metrics/BUILD index 4da26f4d462..99d652ae35a 100644 --- a/staging/src/k8s.io/component-base/metrics/BUILD +++ b/staging/src/k8s.io/component-base/metrics/BUILD @@ -93,6 +93,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD index bc8b2c2f3a4..6bc8be792a5 100644 --- a/test/integration/apiserver/flowcontrol/BUILD +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -21,7 +21,6 @@ go_test( "//test/integration/framework:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 3471d8a7eea..08ca660adc7 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -22,13 +22,11 @@ import ( "io" "net/http/httptest" "strings" - "sync" "testing" "time" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" @@ -78,32 +76,39 @@ func TestPriorityLevelIsolation(t *testing.T) { noxu1Client := getClientFor(loopbackConfig, "noxu1") noxu2Client := getClientFor(loopbackConfig, "noxu2") - priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu1") + queueLength := 50 + concurrencyShares := 1 + + priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu1", concurrencyShares, queueLength) require.NoError(t, err) - priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu2") + priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu2", concurrencyShares, queueLength) require.NoError(t, err) - wg := &sync.WaitGroup{} + stopCh := make(chan struct{}) + defer close(stopCh) // "elephant" - streamRequests(wg, 10, 100, func() { - _, err := noxu1Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + streamRequests(concurrencyShares+queueLength, func() { + _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) - }) - - streamRequests(nil, 1, 100, func() { - _, err := noxu2Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + }, stopCh) + // "mouse" + streamRequests(1, func() { + _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) - }) + }, stopCh) - wg.Wait() + time.Sleep(time.Second * 10) // running in background for a while - dispatchedCountNoxu1, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu1.Name) - require.NoError(t, err) - dispatchedCountNoxu2, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu2.Name) - require.NoError(t, err) + reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) - assert.Equal(t, 1000, dispatchedCountNoxu1) - assert.Equal(t, 100, dispatchedCountNoxu2) + noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name] + noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name] + + if (noxu1RequestCount / 2) > noxu2RequestCount { + t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount) + } } func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { @@ -118,14 +123,14 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf return clientset.NewForConfigOrDie(config) } -func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName string) (int, error) { +func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { resp, err := c.CoreV1(). RESTClient(). Get(). RequestURI("/metrics"). - DoRaw(context.TODO()) + DoRaw(context.Background()) if err != nil { - return 0, err + return nil, err } dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) @@ -134,41 +139,40 @@ func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName str Opts: &expfmt.DecodeOptions{}, } + reqCounts := make(map[string]int) for { var v model.Vector if err := decoder.Decode(&v); err != nil { if err == io.EOF { // Expected loop termination condition. - return 0, fmt.Errorf("no dispatched-count metrics found for priorityLevel %v", priorityLevelName) + return reqCounts, nil } - return 0, fmt.Errorf("failed decoding metrics: %v", err) + return nil, fmt.Errorf("failed decoding metrics: %v", err) } for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { case dispatchedRequestCountMetricsName: - if priorityLevelName == string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel]) { - return int(metric.Value), nil - } + reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value) } } } } -func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { - pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.TODO(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ +func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { + pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: username, }, Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 10, + AssuredConcurrencyShares: int32(concurrencyShares), LimitResponse: flowcontrolv1alpha1.LimitResponse{ Type: flowcontrolv1alpha1.LimitResponseTypeQueue, Queuing: &flowcontrolv1alpha1.QueuingConfiguration{ Queues: 100, HandSize: 1, - QueueLengthLimit: 10, + QueueLengthLimit: int32(queuelength), }, }, }, @@ -232,17 +236,16 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern }) } -func streamRequests(wg *sync.WaitGroup, parallel, times int, request func()) { +func streamRequests(parallel int, request func(), stopCh <-chan struct{}) { for i := 0; i < parallel; i++ { - if wg != nil { - wg.Add(1) - } go func() { - for j := 0; j < times; j++ { - request() - } - if wg != nil { - wg.Done() + for { + select { + case <-stopCh: + return + default: + request() + } } }() } From df5dfb46b78691d6e641f765c1a21a2eb554d60e Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Fri, 3 Apr 2020 21:13:29 +0800 Subject: [PATCH 3/3] assert shared concurrency --- build/visible_to/BUILD | 1 + test/integration/apiserver/flowcontrol/BUILD | 1 - .../apiserver/flowcontrol/concurrency_test.go | 95 ++++++++++++++++--- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/build/visible_to/BUILD b/build/visible_to/BUILD index ff06d6aea7a..e850a3e040f 100644 --- a/build/visible_to/BUILD +++ b/build/visible_to/BUILD @@ -435,6 +435,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD index 6bc8be792a5..fb107f7abb0 100644 --- a/test/integration/apiserver/flowcontrol/BUILD +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -21,7 +21,6 @@ go_test( "//test/integration/framework:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library", - "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 08ca660adc7..f6a30677549 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,9 +42,10 @@ import ( ) const ( - dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" - dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel" - timeout = time.Second * 10 + sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" + labelPriorityLevel = "priorityLevel" + timeout = time.Second * 10 ) func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { @@ -57,8 +57,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1", }) - masterConfig.GenericConfig.MaxRequestsInFlight = 5 - masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5 + masterConfig.GenericConfig.MaxRequestsInFlight = 1 + masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1 masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig _, s, closeFn := framework.RunAMaster(masterConfig) @@ -81,33 +81,59 @@ func TestPriorityLevelIsolation(t *testing.T) { priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu1", concurrencyShares, queueLength) - require.NoError(t, err) + if err != nil { + t.Error(err) + } priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu2", concurrencyShares, queueLength) - require.NoError(t, err) + if err != nil { + t.Error(err) + } + + sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + + if 1 != sharedConcurrency[priorityLevelNoxu1.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1) + } + if 1 != sharedConcurrency[priorityLevelNoxu2.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1) + } stopCh := make(chan struct{}) defer close(stopCh) + // "elephant" streamRequests(concurrencyShares+queueLength, func() { _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) + if err != nil { + t.Error(err) + } }, stopCh) // "mouse" - streamRequests(1, func() { + streamRequests(3, func() { _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) - require.NoError(t, err) + if err != nil { + t.Error(err) + } }, stopCh) time.Sleep(time.Second * 10) // running in background for a while reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name] noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name] - if (noxu1RequestCount / 2) > noxu2RequestCount { - t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount) + // Theoretically, the actual expected value of request counts upon the two priority-level should be + // the equal. We're deliberately lax to make flakes super rare. + if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount { + t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount) } } @@ -123,12 +149,51 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf return clientset.NewForConfigOrDie(config) } -func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { +func getMetrics(c clientset.Interface) (string, error) { resp, err := c.CoreV1(). RESTClient(). Get(). RequestURI("/metrics"). DoRaw(context.Background()) + if err != nil { + return "", err + } + return string(resp), err +} + +func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + concurrency := make(map[string]int) + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return concurrency, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case sharedConcurrencyMetricsName: + concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) if err != nil { return nil, err } @@ -152,7 +217,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, erro for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { case dispatchedRequestCountMetricsName: - reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value) + reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) } } }