mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Also make some design changes exposed in testing and review. Do not remove the ambiguous old metric `apiserver_flowcontrol_request_concurrency_limit` because reviewers though it is too early. This creates a problem, that metric can not keep both of its old meanings. I chose the configured concurrency limit. Testing has revealed a design flaw, which concerns the initialization of the seat demand state tracking. The current design in the KEP is as follows. > Adjustment is also done on configuration change … For a newly > introduced priority level, we set HighSeatDemand, AvgSeatDemand, and > SmoothSeatDemand to NominalCL-LendableSD/2 and StDevSeatDemand to > zero. But this does not work out well at server startup. As part of its construction, the APF controller does a configuration change with zero objects read, to initialize its request-handling state. As always, the two mandatory priority levels are implicitly added whenever they are not read. So this initial reconfig has one non-exempt priority level, the mandatory one called catch-all --- and it gets its SmoothSeatDemand initialized to the whole server concurrency limit. From there it decays slowly, as per the regular design. So for a fairly long time, it appears to have a high demand and competes strongly with the other priority levels. Its Target is higher than all the others, once they start to show up. It properly gets a low NominalCL once other levels show up, which actually makes it compete harder for borrowing: it has an exceptionally high Target and a rather low NominalCL. I have considered the following fix. The idea is that the designed initialization is not appropriate before all the default objects are read. So the fix is to have a mode bit in the controller. In the initial state, those seat demand tracking variables are set to zero. Once the config-producing controller detects that all the default objects are pre-existing, it flips the mode bit. In the later mode, the seat demand tracking variables are initialized as originally designed. However, that still gives preferential treatment to the default PriorityLevelConfiguration objects, over any that may be added later. So I have made a universal and simpler fix: always initialize those seat demand tracking variables to zero. Even if a lot of load shows up quickly, remember that adjustments are frequent (every 10 sec) and the very next one will fully respond to that load. Also: revise logging logic, to log at numerically lower V level when there is a change. Also: bug fix in float64close. Also, separate imports in some file Co-authored-by: Han Kang <hankang@google.com>
327 lines
10 KiB
Go
327 lines
10 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package flowcontrol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/common/expfmt"
|
|
"github.com/prometheus/common/model"
|
|
|
|
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
genericfeatures "k8s.io/apiserver/pkg/features"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
|
"k8s.io/kubernetes/test/integration/framework"
|
|
)
|
|
|
|
const (
|
|
nominalConcurrencyMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
|
|
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
|
|
rejectedRequestCountMetricsName = "apiserver_flowcontrol_rejected_requests_total"
|
|
labelPriorityLevel = "priority_level"
|
|
timeout = time.Second * 10
|
|
)
|
|
|
|
func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*rest.Config, framework.TearDownFunc) {
|
|
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
|
|
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
|
// Ensure all clients are allowed to send requests.
|
|
opts.Authorization.Modes = []string{"AlwaysAllow"}
|
|
opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight
|
|
opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight
|
|
},
|
|
})
|
|
return kubeConfig, tearDownFn
|
|
}
|
|
|
|
func TestPriorityLevelIsolation(t *testing.T) {
|
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
|
|
// NOTE: disabling the feature should fail the test
|
|
kubeConfig, closeFn := setup(t, 1, 1)
|
|
defer closeFn()
|
|
|
|
loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
|
|
noxu1Client := getClientFor(kubeConfig, "noxu1")
|
|
noxu2Client := getClientFor(kubeConfig, "noxu2")
|
|
|
|
queueLength := 50
|
|
concurrencyShares := 1
|
|
|
|
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
|
loopbackClient, "noxu1", concurrencyShares, queueLength)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
|
|
loopbackClient, "noxu2", concurrencyShares, queueLength)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
nominalConcurrency, err := getNominalConcurrencyOfPriorityLevel(loopbackClient)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
if 1 != nominalConcurrency[priorityLevelNoxu1.Name] {
|
|
t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu1.Name], 1)
|
|
}
|
|
if 1 != nominalConcurrency[priorityLevelNoxu2.Name] {
|
|
t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu2.Name], 1)
|
|
}
|
|
|
|
stopCh := make(chan struct{})
|
|
wg := sync.WaitGroup{}
|
|
defer func() {
|
|
close(stopCh)
|
|
wg.Wait()
|
|
}()
|
|
|
|
// "elephant"
|
|
wg.Add(concurrencyShares + queueLength)
|
|
streamRequests(concurrencyShares+queueLength, func() {
|
|
_, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}, &wg, stopCh)
|
|
// "mouse"
|
|
wg.Add(3)
|
|
streamRequests(3, func() {
|
|
_, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}, &wg, stopCh)
|
|
|
|
time.Sleep(time.Second * 10) // running in background for a while
|
|
|
|
allDispatchedReqCounts, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
noxu1RequestCount := allDispatchedReqCounts[priorityLevelNoxu1.Name]
|
|
noxu2RequestCount := allDispatchedReqCounts[priorityLevelNoxu2.Name]
|
|
|
|
if rejectedReqCounts[priorityLevelNoxu1.Name] > 0 {
|
|
t.Errorf(`%v requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
|
|
}
|
|
if rejectedReqCounts[priorityLevelNoxu2.Name] > 0 {
|
|
t.Errorf(`%v requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
|
|
}
|
|
|
|
// Theoretically, the actual expected value of request counts upon the two priority-level should be
|
|
// the equal. We're deliberately lax to make flakes super rare.
|
|
if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
|
|
t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
|
|
}
|
|
}
|
|
|
|
func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
|
|
config := rest.CopyConfig(loopbackConfig)
|
|
config.Impersonate = rest.ImpersonationConfig{
|
|
UserName: username,
|
|
}
|
|
return clientset.NewForConfigOrDie(config)
|
|
}
|
|
|
|
func getMetrics(c clientset.Interface) (string, error) {
|
|
resp, err := c.CoreV1().
|
|
RESTClient().
|
|
Get().
|
|
RequestURI("/metrics").
|
|
DoRaw(context.Background())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(resp), err
|
|
}
|
|
|
|
func getNominalConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
|
|
resp, err := getMetrics(c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
|
|
decoder := expfmt.SampleDecoder{
|
|
Dec: dec,
|
|
Opts: &expfmt.DecodeOptions{},
|
|
}
|
|
|
|
concurrency := make(map[string]int)
|
|
for {
|
|
var v model.Vector
|
|
if err := decoder.Decode(&v); err != nil {
|
|
if err == io.EOF {
|
|
// Expected loop termination condition.
|
|
return concurrency, nil
|
|
}
|
|
return nil, fmt.Errorf("failed decoding metrics: %v", err)
|
|
}
|
|
for _, metric := range v {
|
|
switch name := string(metric.Metric[model.MetricNameLabel]); name {
|
|
case nominalConcurrencyMetricsName:
|
|
concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, map[string]int, error) {
|
|
resp, err := getMetrics(c)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
|
|
decoder := expfmt.SampleDecoder{
|
|
Dec: dec,
|
|
Opts: &expfmt.DecodeOptions{},
|
|
}
|
|
|
|
allReqCounts := make(map[string]int)
|
|
rejectReqCounts := make(map[string]int)
|
|
for {
|
|
var v model.Vector
|
|
if err := decoder.Decode(&v); err != nil {
|
|
if err == io.EOF {
|
|
// Expected loop termination condition.
|
|
return allReqCounts, rejectReqCounts, nil
|
|
}
|
|
return nil, nil, fmt.Errorf("failed decoding metrics: %v", err)
|
|
}
|
|
for _, metric := range v {
|
|
switch name := string(metric.Metric[model.MetricNameLabel]); name {
|
|
case dispatchedRequestCountMetricsName:
|
|
allReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
|
|
case rejectedRequestCountMetricsName:
|
|
rejectReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrol.PriorityLevelConfiguration, *flowcontrol.FlowSchema, error) {
|
|
i0 := int32(0)
|
|
pl, err := c.FlowcontrolV1beta3().PriorityLevelConfigurations().Create(context.Background(), &flowcontrol.PriorityLevelConfiguration{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: username,
|
|
},
|
|
Spec: flowcontrol.PriorityLevelConfigurationSpec{
|
|
Type: flowcontrol.PriorityLevelEnablementLimited,
|
|
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
|
|
NominalConcurrencyShares: int32(concurrencyShares),
|
|
BorrowingLimitPercent: &i0,
|
|
LimitResponse: flowcontrol.LimitResponse{
|
|
Type: flowcontrol.LimitResponseTypeQueue,
|
|
Queuing: &flowcontrol.QueuingConfiguration{
|
|
Queues: 100,
|
|
HandSize: 1,
|
|
QueueLengthLimit: int32(queuelength),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
fs, err := c.FlowcontrolV1beta3().FlowSchemas().Create(context.TODO(), &flowcontrol.FlowSchema{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: username,
|
|
},
|
|
Spec: flowcontrol.FlowSchemaSpec{
|
|
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
|
|
Type: flowcontrol.FlowDistinguisherMethodByUserType,
|
|
},
|
|
MatchingPrecedence: 1000,
|
|
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
|
|
Name: username,
|
|
},
|
|
Rules: []flowcontrol.PolicyRulesWithSubjects{
|
|
{
|
|
ResourceRules: []flowcontrol.ResourcePolicyRule{
|
|
{
|
|
Verbs: []string{flowcontrol.VerbAll},
|
|
APIGroups: []string{flowcontrol.APIGroupAll},
|
|
Resources: []string{flowcontrol.ResourceAll},
|
|
Namespaces: []string{flowcontrol.NamespaceEvery},
|
|
ClusterScope: true,
|
|
},
|
|
},
|
|
Subjects: []flowcontrol.Subject{
|
|
{
|
|
Kind: flowcontrol.SubjectKindUser,
|
|
User: &flowcontrol.UserSubject{
|
|
Name: username,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) {
|
|
fs, err := c.FlowcontrolV1beta3().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, condition := range fs.Status.Conditions {
|
|
if condition.Type == flowcontrol.FlowSchemaConditionDangling {
|
|
if condition.Status == flowcontrol.ConditionFalse {
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
})
|
|
}
|
|
|
|
func streamRequests(parallel int, request func(), wg *sync.WaitGroup, stopCh <-chan struct{}) {
|
|
for i := 0; i < parallel; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
request()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|