diff --git a/build/visible_to/BUILD b/build/visible_to/BUILD index ff06d6aea7a..e850a3e040f 100644 --- a/build/visible_to/BUILD +++ b/build/visible_to/BUILD @@ -435,6 +435,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/staging/src/k8s.io/component-base/metrics/BUILD b/staging/src/k8s.io/component-base/metrics/BUILD index 4da26f4d462..99d652ae35a 100644 --- a/staging/src/k8s.io/component-base/metrics/BUILD +++ b/staging/src/k8s.io/component-base/metrics/BUILD @@ -93,6 +93,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/test/integration/apiserver/BUILD b/test/integration/apiserver/BUILD index 6a5f46fb8f2..4e96ba3f20b 100644 --- a/test/integration/apiserver/BUILD +++ b/test/integration/apiserver/BUILD @@ -90,6 +90,7 @@ filegroup( "//test/integration/apiserver/admissionwebhook:all-srcs", "//test/integration/apiserver/apply:all-srcs", "//test/integration/apiserver/certreload:all-srcs", + "//test/integration/apiserver/flowcontrol:all-srcs", "//test/integration/apiserver/podlogs:all-srcs", ], tags = ["automanaged"], diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD new file mode 100644 index 00000000000..fb107f7abb0 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "concurrency_test.go", + "main_test.go", + ], + tags = ["integration"], + deps = [ + "//pkg/master:go_default_library", + "//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//test/integration/framework:go_default_library", + "//vendor/github.com/prometheus/common/expfmt:go_default_library", + "//vendor/github.com/prometheus/common/model:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go new file mode 100644 index 00000000000..f6a30677549 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -0,0 +1,317 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "fmt" + "io" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" + labelPriorityLevel = "priorityLevel" + timeout = time.Second * 10 +) + +func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { + opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()} + opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" + masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts) + resourceConfig := master.DefaultAPIResourceConfigSource() + resourceConfig.EnableVersions(schema.GroupVersion{ + Group: "flowcontrol.apiserver.k8s.io", + Version: "v1alpha1", + }) + masterConfig.GenericConfig.MaxRequestsInFlight = 1 + masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1 + masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() + masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig + _, s, closeFn := framework.RunAMaster(masterConfig) + + return s, masterConfig.GenericConfig.LoopbackClientConfig, closeFn +} + +func TestPriorityLevelIsolation(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() + // NOTE: disabling the feature should fail the test + _, loopbackConfig, closeFn := setup(t) + defer closeFn() + + loopbackClient := clientset.NewForConfigOrDie(loopbackConfig) + noxu1Client := getClientFor(loopbackConfig, "noxu1") + noxu2Client := getClientFor(loopbackConfig, "noxu2") + + queueLength := 50 + concurrencyShares := 1 + + priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu1", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu2", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + + sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + + if 1 != sharedConcurrency[priorityLevelNoxu1.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1) + } + if 1 != sharedConcurrency[priorityLevelNoxu2.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + + // "elephant" + streamRequests(concurrencyShares+queueLength, func() { + _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Error(err) + } + }, stopCh) + // "mouse" + streamRequests(3, func() { + _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Error(err) + } + }, stopCh) + + time.Sleep(time.Second * 10) // running in background for a while + + reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + + noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name] + noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name] + + // Theoretically, the actual expected value of request counts upon the two priority-level should be + // the equal. We're deliberately lax to make flakes super rare. + if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount { + t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount) + } +} + +func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { + config := &rest.Config{ + Host: loopbackConfig.Host, + QPS: -1, + BearerToken: loopbackConfig.BearerToken, + Impersonate: rest.ImpersonationConfig{ + UserName: username, + }, + } + return clientset.NewForConfigOrDie(config) +} + +func getMetrics(c clientset.Interface) (string, error) { + resp, err := c.CoreV1(). + RESTClient(). + Get(). + RequestURI("/metrics"). + DoRaw(context.Background()) + if err != nil { + return "", err + } + return string(resp), err +} + +func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + concurrency := make(map[string]int) + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return concurrency, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case sharedConcurrencyMetricsName: + concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + reqCounts := make(map[string]int) + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return reqCounts, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case dispatchedRequestCountMetricsName: + reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { + pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + }, + Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ + Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, + Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: int32(concurrencyShares), + LimitResponse: flowcontrolv1alpha1.LimitResponse{ + Type: flowcontrolv1alpha1.LimitResponseTypeQueue, + Queuing: &flowcontrolv1alpha1.QueuingConfiguration{ + Queues: 100, + HandSize: 1, + QueueLengthLimit: int32(queuelength), + }, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, nil, err + } + fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Create(context.TODO(), &flowcontrolv1alpha1.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + }, + Spec: flowcontrolv1alpha1.FlowSchemaSpec{ + DistinguisherMethod: &flowcontrolv1alpha1.FlowDistinguisherMethod{ + Type: flowcontrolv1alpha1.FlowDistinguisherMethodByUserType, + }, + MatchingPrecedence: 1000, + PriorityLevelConfiguration: flowcontrolv1alpha1.PriorityLevelConfigurationReference{ + Name: username, + }, + Rules: []flowcontrolv1alpha1.PolicyRulesWithSubjects{ + { + ResourceRules: []flowcontrolv1alpha1.ResourcePolicyRule{ + { + Verbs: []string{flowcontrolv1alpha1.VerbAll}, + APIGroups: []string{flowcontrolv1alpha1.APIGroupAll}, + Resources: []string{flowcontrolv1alpha1.ResourceAll}, + Namespaces: []string{flowcontrolv1alpha1.NamespaceEvery}, + ClusterScope: true, + }, + }, + Subjects: []flowcontrolv1alpha1.Subject{ + { + Kind: flowcontrolv1alpha1.SubjectKindUser, + User: &flowcontrolv1alpha1.UserSubject{ + Name: username, + }, + }, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, nil, err + } + + return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) { + fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, condition := range fs.Status.Conditions { + if condition.Type == flowcontrolv1alpha1.FlowSchemaConditionDangling { + if condition.Status == flowcontrolv1alpha1.ConditionFalse { + return true, nil + } + } + } + return false, nil + }) +} + +func streamRequests(parallel int, request func(), stopCh <-chan struct{}) { + for i := 0; i < parallel; i++ { + go func() { + for { + select { + case <-stopCh: + return + default: + request() + } + } + }() + } +} diff --git a/test/integration/apiserver/flowcontrol/main_test.go b/test/integration/apiserver/flowcontrol/main_test.go new file mode 100644 index 00000000000..41cf0c86e8c --- /dev/null +++ b/test/integration/apiserver/flowcontrol/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/framework/BUILD b/test/integration/framework/BUILD index 6b46ce43a6f..e0dfc1166f5 100644 --- a/test/integration/framework/BUILD +++ b/test/integration/framework/BUILD @@ -61,10 +61,13 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 52127655ce5..bfa88015e2b 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -39,10 +39,13 @@ import ( "k8s.io/apiserver/pkg/authorization/authorizerfactory" authorizerunion "k8s.io/apiserver/pkg/authorization/union" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -191,6 +194,16 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv } masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout) + + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) { + masterConfig.GenericConfig.FlowControl = utilflowcontrol.New( + masterConfig.ExtraConfig.VersionedInformers, + clientset.FlowcontrolV1alpha1(), + masterConfig.GenericConfig.MaxRequestsInFlight+masterConfig.GenericConfig.MaxMutatingRequestsInFlight, + masterConfig.GenericConfig.RequestTimeout/4, + ) + } + m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate()) if err != nil { // We log the error first so that even if closeFn crashes, the error is shown