add flowcontrol integration test to import whitelist

This commit is contained in:
yue9944882 2020-03-20 01:46:19 +08:00
parent 30bc0fce48
commit 875407a450
3 changed files with 44 additions and 41 deletions

View File

@ -93,6 +93,7 @@ package_group(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
"//staging/src/k8s.io/component-base/metrics/...", "//staging/src/k8s.io/component-base/metrics/...",
"//test/e2e_node", "//test/e2e_node",
"//test/integration/apiserver/flowcontrol",
"//vendor/...", "//vendor/...",
], ],
) )

View File

@ -21,7 +21,6 @@ go_test(
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
], ],
) )

View File

@ -22,13 +22,11 @@ import (
"io" "io"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
@ -78,32 +76,39 @@ func TestPriorityLevelIsolation(t *testing.T) {
noxu1Client := getClientFor(loopbackConfig, "noxu1") noxu1Client := getClientFor(loopbackConfig, "noxu1")
noxu2Client := getClientFor(loopbackConfig, "noxu2") noxu2Client := getClientFor(loopbackConfig, "noxu2")
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu1") queueLength := 50
concurrencyShares := 1
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
loopbackClient, "noxu1", concurrencyShares, queueLength)
require.NoError(t, err) require.NoError(t, err)
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu2") priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
loopbackClient, "noxu2", concurrencyShares, queueLength)
require.NoError(t, err) require.NoError(t, err)
wg := &sync.WaitGroup{} stopCh := make(chan struct{})
defer close(stopCh)
// "elephant" // "elephant"
streamRequests(wg, 10, 100, func() { streamRequests(concurrencyShares+queueLength, func() {
_, err := noxu1Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err) require.NoError(t, err)
}) }, stopCh)
// "mouse"
streamRequests(nil, 1, 100, func() { streamRequests(1, func() {
_, err := noxu2Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err) require.NoError(t, err)
}) }, stopCh)
wg.Wait() time.Sleep(time.Second * 10) // running in background for a while
dispatchedCountNoxu1, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu1.Name) reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
require.NoError(t, err)
dispatchedCountNoxu2, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu2.Name)
require.NoError(t, err)
assert.Equal(t, 1000, dispatchedCountNoxu1) noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name]
assert.Equal(t, 100, dispatchedCountNoxu2) noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name]
if (noxu1RequestCount / 2) > noxu2RequestCount {
t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
}
} }
func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
@ -118,14 +123,14 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf
return clientset.NewForConfigOrDie(config) return clientset.NewForConfigOrDie(config)
} }
func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName string) (int, error) { func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
resp, err := c.CoreV1(). resp, err := c.CoreV1().
RESTClient(). RESTClient().
Get(). Get().
RequestURI("/metrics"). RequestURI("/metrics").
DoRaw(context.TODO()) DoRaw(context.Background())
if err != nil { if err != nil {
return 0, err return nil, err
} }
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
@ -134,41 +139,40 @@ func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName str
Opts: &expfmt.DecodeOptions{}, Opts: &expfmt.DecodeOptions{},
} }
reqCounts := make(map[string]int)
for { for {
var v model.Vector var v model.Vector
if err := decoder.Decode(&v); err != nil { if err := decoder.Decode(&v); err != nil {
if err == io.EOF { if err == io.EOF {
// Expected loop termination condition. // Expected loop termination condition.
return 0, fmt.Errorf("no dispatched-count metrics found for priorityLevel %v", priorityLevelName) return reqCounts, nil
} }
return 0, fmt.Errorf("failed decoding metrics: %v", err) return nil, fmt.Errorf("failed decoding metrics: %v", err)
} }
for _, metric := range v { for _, metric := range v {
switch name := string(metric.Metric[model.MetricNameLabel]); name { switch name := string(metric.Metric[model.MetricNameLabel]); name {
case dispatchedRequestCountMetricsName: case dispatchedRequestCountMetricsName:
if priorityLevelName == string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel]) { reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value)
return int(metric.Value), nil
}
} }
} }
} }
} }
func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) {
pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.TODO(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrolv1alpha1.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: username, Name: username,
}, },
Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: 10, AssuredConcurrencyShares: int32(concurrencyShares),
LimitResponse: flowcontrolv1alpha1.LimitResponse{ LimitResponse: flowcontrolv1alpha1.LimitResponse{
Type: flowcontrolv1alpha1.LimitResponseTypeQueue, Type: flowcontrolv1alpha1.LimitResponseTypeQueue,
Queuing: &flowcontrolv1alpha1.QueuingConfiguration{ Queuing: &flowcontrolv1alpha1.QueuingConfiguration{
Queues: 100, Queues: 100,
HandSize: 1, HandSize: 1,
QueueLengthLimit: 10, QueueLengthLimit: int32(queuelength),
}, },
}, },
}, },
@ -232,17 +236,16 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern
}) })
} }
func streamRequests(wg *sync.WaitGroup, parallel, times int, request func()) { func streamRequests(parallel int, request func(), stopCh <-chan struct{}) {
for i := 0; i < parallel; i++ { for i := 0; i < parallel; i++ {
if wg != nil {
wg.Add(1)
}
go func() { go func() {
for j := 0; j < times; j++ { for {
request() select {
} case <-stopCh:
if wg != nil { return
wg.Done() default:
request()
}
} }
}() }()
} }