Files
kubernetes/test/integration/apiserver/flowcontrol/concurrency_util_test.go
2022-08-02 14:29:03 +00:00

507 lines
18 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/plugin/pkg/authorizer/webhook"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
v1 "k8s.io/client-go/tools/clientcmd/api/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/test/integration/framework"
)
const (
requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum"
requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count"
priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum"
priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_seat_utilization_count"
fakeworkDuration = 200 * time.Millisecond
testWarmUpTime = 2 * time.Second
testTime = 10 * time.Second
)
func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int, authz authorizer.Authorizer) (*rest.Config, framework.TearDownFunc) {
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Ensure all clients are allowed to send requests.
opts.Authorization.Modes = []string{"AlwaysAllow"}
opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight
opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = maxMutatingRequestsInFlight
},
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.Authorization.Authorizer = authz
},
})
return kubeConfig, tearDownFn
}
type SumAndCount struct {
Sum float64
Count int
}
type plMetrics struct {
execSeconds SumAndCount
seatUtil SumAndCount
availableSeats int
}
// metricSnapshot maps from a priority level label to
// a plMetrics struct containing APF metrics of interest
type metricSnapshot map[string]plMetrics
// Client request latency measurement
type clientLatencyMeasurement struct {
SumAndCount
SumSq float64 // latency sum of squares
Mu sync.Mutex
}
func (clm *clientLatencyMeasurement) reset() {
clm.Mu.Lock()
clm.Sum = 0
clm.Count = 0
clm.SumSq = 0
clm.Mu.Unlock()
}
func (clm *clientLatencyMeasurement) update(duration float64) {
clm.Mu.Lock()
clm.Count += 1
clm.Sum += duration
clm.SumSq += duration * duration
clm.Mu.Unlock()
}
func (clm *clientLatencyMeasurement) getStats() clientLatencyStats {
mean := clm.Sum / float64(clm.Count)
ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean
stdDev := math.Sqrt(ss / float64(clm.Count))
cv := stdDev / mean
return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv}
}
type clientLatencyStats struct {
mean float64 // latency average
stdDev float64 // latency standard deviation
cv float64 // latency coefficient of variation
}
type plMetricAvg struct {
reqExecution float64 // average request execution time
seatUtil float64 // average seat utilization
}
func intervalMetricAvg(snapshots map[string]metricSnapshot, t0 string, t1 string, plLabel string) plMetricAvg {
plmT0 := snapshots[t0][plLabel]
plmT1 := snapshots[t1][plLabel]
return plMetricAvg{
reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count),
seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count),
}
}
// This integration test checks the client-side expected concurrency and the server-side observed concurrency
// to make sure that they are close within a small error bound and that the priority levels are isolated.
// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead
// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially
// increase request execution time to make the system noise relatively insignificant.
// This test calculates the server-side observed concurrency from average priority level seat utilization APF metric.
// It also assumes that
// (server-side request execution throughput) == (client-side request throughput) and derives a formula to
// calculate the client-side expected concurrency. The two are compared and a small error bound is determined
// from estimating the noise using 2*(standard deviation of requenst latency)/(avg request latency).
func TestConcurrencyIsolation(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
// NOTE: disabling the feature should fail the test
// start webhook server
serv := &mockV1Service{allow: true, statusCode: 200}
s, err := NewV1TestServer(serv, testcerts.ServerCert, testcerts.ServerKey, testcerts.CACert)
if err != nil {
t.Fatal(err)
}
defer s.Close()
authorizer, err := newV1Authorizer(s.URL, testcerts.ClientCert, testcerts.ClientKey, testcerts.CACert, 0)
if err != nil {
t.Fatal(err)
}
kubeConfig, closeFn := setupWithAuthorizer(t, 10, 10, authorizer)
defer closeFn()
loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
noxu1Client := getClientFor(kubeConfig, "noxu1")
noxu2Client := getClientFor(kubeConfig, "noxu2")
queueLength := 50
concurrencyShares := 100
plNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
loopbackClient, "noxu1", concurrencyShares, queueLength)
if err != nil {
t.Error(err)
}
plNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
loopbackClient, "noxu2", concurrencyShares, queueLength)
if err != nil {
t.Error(err)
}
stopCh := make(chan struct{})
wg := sync.WaitGroup{}
// "elephant"
noxu1NumGoroutines := 5 + queueLength
var noxu1LatMeasure clientLatencyMeasurement
wg.Add(noxu1NumGoroutines)
streamRequests(noxu1NumGoroutines, func() {
start := time.Now()
_, err := noxu1Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{})
duration := time.Since(start).Seconds()
noxu1LatMeasure.update(duration)
if err != nil {
t.Error(err)
}
}, &wg, stopCh)
// "mouse"
noxu2NumGoroutines := 3
var noxu2LatMeasure clientLatencyMeasurement
wg.Add(noxu2NumGoroutines)
streamRequests(noxu2NumGoroutines, func() {
start := time.Now()
_, err := noxu2Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{})
duration := time.Since(start).Seconds()
noxu2LatMeasure.update(duration)
if err != nil {
t.Error(err)
}
}, &wg, stopCh)
// Warm up
time.Sleep(testWarmUpTime)
noxu1LatMeasure.reset()
noxu2LatMeasure.reset()
// Snapshots maps from a time label to a metricSnapshot
snapshots := make(map[string]metricSnapshot)
snapshots["t0"], err = getRequestMetricsSnapshot(loopbackClient)
if err != nil {
t.Error(err)
}
time.Sleep(testTime) // after warming up, the test enters a steady state
snapshots["t1"], err = getRequestMetricsSnapshot(loopbackClient)
if err != nil {
t.Error(err)
}
close(stopCh)
// Check the assumptions of the test
noxu1T0 := snapshots["t0"][plNoxu1.Name]
noxu1T1 := snapshots["t1"][plNoxu1.Name]
noxu2T0 := snapshots["t0"][plNoxu2.Name]
noxu2T1 := snapshots["t1"][plNoxu2.Name]
if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count {
t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d",
noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count)
}
t.Logf("noxu1 priority level concurrency limit: %d", noxu1T0.availableSeats)
t.Logf("noxu2 priority level concurrency limit: %d", noxu2T0.availableSeats)
if (noxu1T0.availableSeats != noxu1T1.availableSeats) || (noxu2T0.availableSeats != noxu2T1.availableSeats) {
t.Errorf("The number of available seats changed: noxu1 (%d, %d) noxu2 (%d, %d)",
noxu1T0.availableSeats, noxu1T1.availableSeats, noxu2T0.availableSeats, noxu2T1.availableSeats)
}
if (noxu1T0.availableSeats <= 4) || (noxu2T0.availableSeats <= 4) {
t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4",
noxu1T0.availableSeats, noxu2T0.availableSeats)
}
// No reuqests should be rejected under normal situations
_, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
if err != nil {
t.Error(err)
}
if rejectedReqCounts[plNoxu1.Name] > 0 {
t.Errorf(`%d requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu1.Name])
}
if rejectedReqCounts[plNoxu2.Name] > 0 {
t.Errorf(`%d requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu2.Name])
}
// Calculate APF server side metric averages during the test interval
noxu1Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu1.Name)
noxu2Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu2.Name)
t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution)
t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil)
// Wait till the client goroutines finish before computing the client side request latency statistics
wg.Wait()
noxu1LatStats := noxu1LatMeasure.getStats()
noxu2LatStats := noxu2LatMeasure.getStats()
t.Logf("noxu1 client request count %d duration mean %v stddev %v cv %v", noxu1LatMeasure.Count, noxu1LatStats.mean, noxu1LatStats.stdDev, noxu1LatStats.cv)
t.Logf("noxu2 client request count %d duration mean %v stddev %v cv %v", noxu2LatMeasure.Count, noxu2LatStats.mean, noxu2LatStats.stdDev, noxu2LatStats.cv)
// Calculate server-side observed concurrency
noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats)
noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats)
// Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side
// Expected concurrency computed can sometimes be larger than the number of available seats. We use the number of available seats as an upper bound
noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean
noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean
t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency)
// Calculate the tolerable error margin and perform the final check
margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv)
t.Logf("Error margin is %v", margin)
isConcurrencyExpected := func(name string, observed float64, expected float64) bool {
t.Logf("%v relative error is %v", name, math.Abs(expected-observed)/expected)
return math.Abs(expected-observed)/expected <= margin
}
if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) {
t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency)
}
if !isConcurrencyExpected(plNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) {
t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency)
}
// Check server-side APF measurements
// For the elephant noxu1, the avg seat utilization should be close to 1.0
if math.Abs(1-noxu1Avg.seatUtil) > 0.05 {
t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1Avg.seatUtil)
}
// For the mouse noxu2, the observed concurrency should be close to the number of goroutines it uses
if math.Abs(1-noxu2ObservedConcurrency/float64(noxu2NumGoroutines)) > 0.05 {
t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%d", noxu2ObservedConcurrency, noxu2NumGoroutines)
}
}
func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) {
resp, err := getMetrics(c)
if err != nil {
return nil, err
}
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}
snapshot := metricSnapshot{}
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
return snapshot, nil
}
return nil, fmt.Errorf("failed decoding metrics: %v", err)
}
for _, metric := range v {
plLabel := string(metric.Metric[labelPriorityLevel])
entry := plMetrics{}
if v, ok := snapshot[plLabel]; ok {
entry = v
}
switch name := string(metric.Metric[model.MetricNameLabel]); name {
case requestExecutionSecondsSumName:
entry.execSeconds.Sum = float64(metric.Value)
case requestExecutionSecondsCountName:
entry.execSeconds.Count = int(metric.Value)
case priorityLevelSeatUtilSumName:
entry.seatUtil.Sum = float64(metric.Value)
case priorityLevelSeatUtilCountName:
entry.seatUtil.Count = int(metric.Value)
case requestConcurrencyLimitMetricsName:
entry.availableSeats = int(metric.Value)
}
snapshot[plLabel] = entry
}
}
}
// Webhook authorizer code copied from staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go with minor changes
// V1Service mocks a remote service.
type V1Service interface {
Review(*authorizationv1.SubjectAccessReview)
HTTPStatusCode() int
}
// NewV1TestServer wraps a V1Service as an httptest.Server.
func NewV1TestServer(s V1Service, cert, key, caCert []byte) (*httptest.Server, error) {
const webhookPath = "/testserver"
var tlsConfig *tls.Config
if cert != nil {
cert, err := tls.X509KeyPair(cert, key)
if err != nil {
return nil, err
}
tlsConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
}
if caCert != nil {
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(caCert)
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}
tlsConfig.ClientCAs = rootCAs
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
serveHTTP := func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, fmt.Sprintf("unexpected method: %v", r.Method), http.StatusMethodNotAllowed)
return
}
if r.URL.Path != webhookPath {
http.Error(w, fmt.Sprintf("unexpected path: %v", r.URL.Path), http.StatusNotFound)
return
}
var review authorizationv1.SubjectAccessReview
bodyData, _ := ioutil.ReadAll(r.Body)
if err := json.Unmarshal(bodyData, &review); err != nil {
http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest)
return
}
// ensure we received the serialized review as expected
if review.APIVersion != "authorization.k8s.io/v1" {
http.Error(w, fmt.Sprintf("wrong api version: %s", string(bodyData)), http.StatusBadRequest)
return
}
// once we have a successful request, always call the review to record that we were called
s.Review(&review)
if s.HTTPStatusCode() < 200 || s.HTTPStatusCode() >= 300 {
http.Error(w, "HTTP Error", s.HTTPStatusCode())
return
}
type status struct {
Allowed bool `json:"allowed"`
Reason string `json:"reason"`
EvaluationError string `json:"evaluationError"`
}
resp := struct {
APIVersion string `json:"apiVersion"`
Status status `json:"status"`
}{
APIVersion: authorizationv1.SchemeGroupVersion.String(),
Status: status{review.Status.Allowed, review.Status.Reason, review.Status.EvaluationError},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
server := httptest.NewUnstartedServer(http.HandlerFunc(serveHTTP))
server.TLS = tlsConfig
server.StartTLS()
// Adjust the path to point to our custom path
serverURL, _ := url.Parse(server.URL)
serverURL.Path = webhookPath
server.URL = serverURL.String()
return server, nil
}
// A service that can be set to allow all or deny all authorization requests.
type mockV1Service struct {
allow bool
statusCode int
}
func (m *mockV1Service) Review(r *authorizationv1.SubjectAccessReview) {
if r.Spec.User == "noxu1" || r.Spec.User == "noxu2" {
time.Sleep(fakeworkDuration) // simulate fake work with sleep
}
r.Status.Allowed = m.allow
}
func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode }
// newV1Authorizer creates a temporary kubeconfig file from the provided arguments and attempts to load
// a new WebhookAuthorizer from it.
func newV1Authorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration) (*webhook.WebhookAuthorizer, error) {
tempfile, err := ioutil.TempFile("", "")
if err != nil {
return nil, err
}
p := tempfile.Name()
defer os.Remove(p)
config := v1.Config{
Clusters: []v1.NamedCluster{
{
Cluster: v1.Cluster{Server: callbackURL, CertificateAuthorityData: ca},
},
},
AuthInfos: []v1.NamedAuthInfo{
{
AuthInfo: v1.AuthInfo{ClientCertificateData: clientCert, ClientKeyData: clientKey},
},
},
}
if err := json.NewEncoder(tempfile).Encode(config); err != nil {
return nil, err
}
clientConfig, err := webhookutil.LoadKubeconfig(p, nil)
if err != nil {
return nil, err
}
return webhook.New(clientConfig, "v1", cacheTime, cacheTime, testRetryBackoff)
}
var testRetryBackoff = wait.Backoff{
Duration: 5 * time.Millisecond,
Factor: 1.5,
Jitter: 0.2,
Steps: 5,
}