From f298d506e8f2113471f69723806546389178b992 Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Thu, 14 Jul 2022 08:54:12 -0400 Subject: [PATCH 01/10] APF concurrency isolation integration test (#1) * Add APF concurrency utilization test --- .../flowcontrol/concurrency_util_test.go | 491 ++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 test/integration/apiserver/flowcontrol/concurrency_util_test.go diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go new file mode 100644 index 00000000000..f549e4c0d10 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -0,0 +1,491 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + authorizationv1 "k8s.io/api/authorization/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts" + "k8s.io/apiserver/pkg/authorization/authorizer" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + webhookutil "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/apiserver/plugin/pkg/authorizer/webhook" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + v1 "k8s.io/client-go/tools/clientcmd/api/v1" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" + requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" + priorityLevelSeatCountSamplesSumName = "apiserver_flowcontrol_priority_level_seat_count_samples_sum" + priorityLevelSeatCountSamplesCountName = "apiserver_flowcontrol_priority_level_seat_count_samples_count" + fakeworkDuration = 200 * time.Millisecond + testWarmUpTime = 2 * time.Second + testTime = 10 * time.Second +) + +func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int, authz authorizer.Authorizer) (*rest.Config, framework.TearDownFunc) { + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Ensure all clients are allowed to send requests. + opts.Authorization.Modes = []string{"AlwaysAllow"} + opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight + opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = maxMutatingRequestsInFlight + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authorization.Authorizer = authz + }, + }) + return kubeConfig, tearDownFn +} + +func TestConcurrencyIsolation(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() + // NOTE: disabling the feature should fail the test + // start webhook server + serv := &mockV1Service{allow: true, statusCode: 200} + s, err := NewV1TestServer(serv, testcerts.ServerCert, testcerts.ServerKey, testcerts.CACert) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + authorizer, err := newV1Authorizer(s.URL, testcerts.ClientCert, testcerts.ClientKey, testcerts.CACert, 0) + if err != nil { + t.Fatal(err) + } + + kubeConfig, closeFn := setupWithAuthorizer(t, 10, 10, authorizer) + defer closeFn() + + loopbackClient := clientset.NewForConfigOrDie(kubeConfig) + noxu1Client := getClientFor(kubeConfig, "noxu1") + noxu2Client := getClientFor(kubeConfig, "noxu2") + + queueLength := 50 + concurrencyShares := 100 + + priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu1", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu2", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + availableSeats, err := getAvailableSeatsOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + + t.Logf("noxu1 priority level concurrency limit: %v", availableSeats[priorityLevelNoxu1.Name]) + t.Logf("noxu2 priority level concurrency limit: %v", availableSeats[priorityLevelNoxu2.Name]) + if (availableSeats[priorityLevelNoxu1.Name] <= 4) || (availableSeats[priorityLevelNoxu2.Name] <= 4) { + t.Errorf("The number of available seats for test client priority levels are too small: (%v, %v). Expecting a number > 4", + availableSeats[priorityLevelNoxu1.Name], availableSeats[priorityLevelNoxu2.Name]) + } + + stopCh := make(chan struct{}) + wg := sync.WaitGroup{} + + // "elephant" + noxu1NumGoroutines := 5 + queueLength + var noxu1ClientRequestLatencySum float64 + var noxu1ClientRequestLatencySumSq float64 + var noxu1ClientRequestLatencyCount int32 + var noxu1Mutex sync.Mutex + streamRequestsWithIndex(noxu1NumGoroutines, func(idx int) { + start := time.Now() + _, err := noxu1Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) + duration := time.Since(start).Seconds() + noxu1Mutex.Lock() + noxu1ClientRequestLatencyCount += 1 + noxu1ClientRequestLatencySum += duration + noxu1ClientRequestLatencySumSq += duration * duration + noxu1Mutex.Unlock() + if err != nil { + t.Error(err) + } + }, &wg, stopCh) + // "mouse" + noxu2NumGoroutines := 3 + var noxu2ClientRequestLatencySum float64 + var noxu2ClientRequestLatencySumSq float64 + var noxu2ClientRequestLatencyCount int32 + var noxu2Mutex sync.Mutex + streamRequestsWithIndex(noxu2NumGoroutines, func(idx int) { + start := time.Now() + _, err := noxu2Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) + duration := time.Since(start).Seconds() + noxu2Mutex.Lock() + noxu2ClientRequestLatencyCount += 1 + noxu2ClientRequestLatencySum += duration + noxu2ClientRequestLatencySumSq += duration * duration + noxu2Mutex.Unlock() + if err != nil { + t.Error(err) + } + }, &wg, stopCh) + + // Warm up + time.Sleep(testWarmUpTime) + + // Reset counters + noxu1Mutex.Lock() + noxu1ClientRequestLatencyCount = 0 + noxu1ClientRequestLatencySum = 0 + noxu1ClientRequestLatencySumSq = 0 + noxu1Mutex.Unlock() + noxu2Mutex.Lock() + noxu2ClientRequestLatencyCount = 0 + noxu2ClientRequestLatencySum = 0 + noxu2ClientRequestLatencySumSq = 0 + noxu2Mutex.Unlock() + earlierRequestExecutionSecondsSum, earlierRequestExecutionSecondsCount, earlierPLSeatUtilSamplesSum, earlierPLSeatUtilSamplesCount, err := getRequestExecutionMetrics(loopbackClient) + if err != nil { + t.Error(err) + } + time.Sleep(testTime) // after warming up, the test enters a steady state + laterRequestExecutionSecondsSum, laterRequestExecutionSecondsCount, laterPLSeatUtilSamplesSum, laterPLSeatUtilSamplesCount, err := getRequestExecutionMetrics(loopbackClient) + if err != nil { + t.Error(err) + } + close(stopCh) + + noxu1RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu1.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu1.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu1.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu1.Name]) + noxu2RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu2.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu2.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu2.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu2.Name]) + noxu1PLSeatUtilAvg := (laterPLSeatUtilSamplesSum[priorityLevelNoxu1.Name] - earlierPLSeatUtilSamplesSum[priorityLevelNoxu1.Name]) / float64(laterPLSeatUtilSamplesCount[priorityLevelNoxu1.Name]-earlierPLSeatUtilSamplesCount[priorityLevelNoxu1.Name]) + noxu2PLSeatUtilAvg := (laterPLSeatUtilSamplesSum[priorityLevelNoxu2.Name] - earlierPLSeatUtilSamplesSum[priorityLevelNoxu2.Name]) / float64(laterPLSeatUtilSamplesCount[priorityLevelNoxu2.Name]-earlierPLSeatUtilSamplesCount[priorityLevelNoxu2.Name]) + t.Logf("\nnoxu1RequestExecutionSecondsAvg %v\nnoxu2RequestExecutionSecondsAvg %v", noxu1RequestExecutionSecondsAvg, noxu2RequestExecutionSecondsAvg) + t.Logf("\nnoxu1PLSeatUtilAvg %v\nnoxu2PLSeatUtilAvg %v", noxu1PLSeatUtilAvg, noxu2PLSeatUtilAvg) + + wg.Wait() // wait till the client goroutines finish before computing the statistics + noxu1ClientRequestLatencySecondsAvg, noxu1ClientRequestLatencySecondsSdev := computeClientRequestLatencyStats(noxu1ClientRequestLatencyCount, noxu1ClientRequestLatencySum, noxu1ClientRequestLatencySumSq) + noxu2ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev := computeClientRequestLatencyStats(noxu2ClientRequestLatencyCount, noxu2ClientRequestLatencySum, noxu2ClientRequestLatencySumSq) + t.Logf("\nnoxu1ClientRequestLatencyCount %v\nnoxu2ClientRequestLatencyCount %v", noxu1ClientRequestLatencyCount, noxu2ClientRequestLatencyCount) + t.Logf("\nnoxu1ClientRequestLatencySecondsAvg %v\nnoxu2ClientRequestLatencySecondsAvg %v", noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsAvg) + t.Logf("\nnoxu1ClientRequestLatencySecondsSdev %v\nnoxu2ClientRequestLatencySecondsSdev %v", noxu1ClientRequestLatencySecondsSdev, noxu2ClientRequestLatencySecondsSdev) + allDispatchedReqCounts, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + t.Logf("\nnoxu1APFRequestCount %v\nnoxu2APFRequestCount %v", allDispatchedReqCounts[priorityLevelNoxu1.Name], allDispatchedReqCounts[priorityLevelNoxu2.Name]) + if rejectedReqCounts[priorityLevelNoxu1.Name] > 0 { + t.Errorf(`%v requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name]) + } + if rejectedReqCounts[priorityLevelNoxu2.Name] > 0 { + t.Errorf(`%v requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name]) + } + + // Calculate server-side observed concurrency + noxu1ObservedConcurrency := noxu1PLSeatUtilAvg * float64(availableSeats[priorityLevelNoxu1.Name]) + noxu2ObservedConcurrency := noxu2PLSeatUtilAvg * float64(availableSeats[priorityLevelNoxu2.Name]) + // Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side + // Expected concurrency computed can sometimes be larger than the number of available seats. We use the number of available seats as an upper bound + noxu1ExpectedConcurrency := math.Min(float64(noxu1NumGoroutines)*noxu1RequestExecutionSecondsAvg/noxu1ClientRequestLatencySecondsAvg, float64(availableSeats[priorityLevelNoxu1.Name])) + noxu2ExpectedConcurrency := math.Min(float64(noxu2NumGoroutines)*noxu2RequestExecutionSecondsAvg/noxu2ClientRequestLatencySecondsAvg, float64(availableSeats[priorityLevelNoxu2.Name])) + t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency) + // Calculate the tolerable error margin and perform the final check + margin := 2 * math.Min(noxu1ClientRequestLatencySecondsSdev/noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev/noxu2ClientRequestLatencySecondsAvg) + t.Logf("\nnoxu1Margin %v\nnoxu2Margin %v", noxu1ClientRequestLatencySecondsSdev/noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev/noxu2ClientRequestLatencySecondsAvg) + t.Logf("Error margin is %v", margin) + + isConcurrencyExpected := func(name string, observed float64, expected float64) bool { + t.Logf("%v relative error is %v", name, math.Abs(expected-observed)/expected) + return math.Abs(expected-observed)/expected <= margin + } + if !isConcurrencyExpected(priorityLevelNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) { + t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency) + } + if !isConcurrencyExpected(priorityLevelNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) { + t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency) + } + + // Check server-side APF measurements + if math.Abs(1-noxu1PLSeatUtilAvg) > 0.05 { + t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1PLSeatUtilAvg) + } + if math.Abs(1-noxu2ObservedConcurrency/float64(noxu2NumGoroutines)) > 0.05 { + t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%v", noxu2ObservedConcurrency, noxu2NumGoroutines) + } +} + +func computeClientRequestLatencyStats(count int32, sum, sumsq float64) (float64, float64) { + mean := sum / float64(count) + ss := sumsq - mean*sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean + return mean, math.Sqrt(ss / float64(count)) +} + +func getAvailableSeatsOfPriorityLevel(c clientset.Interface) (map[string]int, error) { + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + concurrency := make(map[string]int) + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return concurrency, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case requestConcurrencyLimitMetricsName: + concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func getRequestExecutionMetrics(c clientset.Interface) (map[string]float64, map[string]int, map[string]float64, map[string]int, error) { + + resp, err := getMetrics(c) + if err != nil { + return nil, nil, nil, nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + RequestExecutionSecondsSum := make(map[string]float64) + RequestExecutionSecondsCount := make(map[string]int) + PriorityLevelSeatCountSamplesSum := make(map[string]float64) + PriorityLevelSeatCountSamplesCount := make(map[string]int) + + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return RequestExecutionSecondsSum, RequestExecutionSecondsCount, + PriorityLevelSeatCountSamplesSum, PriorityLevelSeatCountSamplesCount, nil + } + return nil, nil, nil, nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case requestExecutionSecondsSumName: + RequestExecutionSecondsSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) + case requestExecutionSecondsCountName: + RequestExecutionSecondsCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + case priorityLevelSeatCountSamplesSumName: + PriorityLevelSeatCountSamplesSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) + case priorityLevelSeatCountSamplesCountName: + PriorityLevelSeatCountSamplesCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + } + } + } +} + +func streamRequestsWithIndex(parallel int, request func(idx int), wg *sync.WaitGroup, stopCh <-chan struct{}) { + wg.Add(parallel) + for i := 0; i < parallel; i++ { + go func(idx int) { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + request(idx) + } + } + }(i) + } +} + +// V1Service mocks a remote service. +type V1Service interface { + Review(*authorizationv1.SubjectAccessReview) + HTTPStatusCode() int +} + +// NewV1TestServer wraps a V1Service as an httptest.Server. +func NewV1TestServer(s V1Service, cert, key, caCert []byte) (*httptest.Server, error) { + const webhookPath = "/testserver" + var tlsConfig *tls.Config + if cert != nil { + cert, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + tlsConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } + + if caCert != nil { + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(caCert) + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } + tlsConfig.ClientCAs = rootCAs + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + + serveHTTP := func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, fmt.Sprintf("unexpected method: %v", r.Method), http.StatusMethodNotAllowed) + return + } + if r.URL.Path != webhookPath { + http.Error(w, fmt.Sprintf("unexpected path: %v", r.URL.Path), http.StatusNotFound) + return + } + + var review authorizationv1.SubjectAccessReview + bodyData, _ := ioutil.ReadAll(r.Body) + if err := json.Unmarshal(bodyData, &review); err != nil { + http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest) + return + } + + // ensure we received the serialized review as expected + if review.APIVersion != "authorization.k8s.io/v1" { + http.Error(w, fmt.Sprintf("wrong api version: %s", string(bodyData)), http.StatusBadRequest) + return + } + // once we have a successful request, always call the review to record that we were called + s.Review(&review) + if s.HTTPStatusCode() < 200 || s.HTTPStatusCode() >= 300 { + http.Error(w, "HTTP Error", s.HTTPStatusCode()) + return + } + type status struct { + Allowed bool `json:"allowed"` + Reason string `json:"reason"` + EvaluationError string `json:"evaluationError"` + } + resp := struct { + APIVersion string `json:"apiVersion"` + Status status `json:"status"` + }{ + APIVersion: authorizationv1.SchemeGroupVersion.String(), + Status: status{review.Status.Allowed, review.Status.Reason, review.Status.EvaluationError}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + } + + server := httptest.NewUnstartedServer(http.HandlerFunc(serveHTTP)) + server.TLS = tlsConfig + server.StartTLS() + + // Adjust the path to point to our custom path + serverURL, _ := url.Parse(server.URL) + serverURL.Path = webhookPath + server.URL = serverURL.String() + + return server, nil +} + +// A service that can be set to allow all or deny all authorization requests. +type mockV1Service struct { + allow bool + statusCode int + called int +} + +func (m *mockV1Service) Review(r *authorizationv1.SubjectAccessReview) { + if r.Spec.User == "noxu1" || r.Spec.User == "noxu2" { + time.Sleep(fakeworkDuration) // simulate fake work with sleep + } + m.called++ + r.Status.Allowed = m.allow +} +func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } + +// newV1Authorizer creates a temporary kubeconfig file from the provided arguments and attempts to load +// a new WebhookAuthorizer from it. +func newV1Authorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration) (*webhook.WebhookAuthorizer, error) { + tempfile, err := ioutil.TempFile("", "") + if err != nil { + return nil, err + } + p := tempfile.Name() + defer os.Remove(p) + config := v1.Config{ + Clusters: []v1.NamedCluster{ + { + Cluster: v1.Cluster{Server: callbackURL, CertificateAuthorityData: ca}, + }, + }, + AuthInfos: []v1.NamedAuthInfo{ + { + AuthInfo: v1.AuthInfo{ClientCertificateData: clientCert, ClientKeyData: clientKey}, + }, + }, + } + if err := json.NewEncoder(tempfile).Encode(config); err != nil { + return nil, err + } + clientConfig, err := webhookutil.LoadKubeconfig(p, nil) + if err != nil { + return nil, err + } + + return webhook.New(clientConfig, "v1", cacheTime, cacheTime, testRetryBackoff) +} + +var testRetryBackoff = wait.Backoff{ + Duration: 5 * time.Millisecond, + Factor: 1.5, + Jitter: 0.2, + Steps: 5, +} From 6dfd11cc61f7b384fca57a99804b5c8a83c43feb Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Fri, 15 Jul 2022 14:45:54 +0000 Subject: [PATCH 02/10] Allow apf concurrency util test to import prometheus --- hack/verify-prometheus-imports.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/hack/verify-prometheus-imports.sh b/hack/verify-prometheus-imports.sh index 3572a3d905f..b29f8949f90 100755 --- a/hack/verify-prometheus-imports.sh +++ b/hack/verify-prometheus-imports.sh @@ -74,6 +74,7 @@ allowed_prometheus_importers=( ./test/e2e_node/resource_metrics_test.go ./test/instrumentation/main_test.go ./test/integration/apiserver/flowcontrol/concurrency_test.go + ./test/integration/apiserver/flowcontrol/concurrency_util_test.go ./test/integration/metrics/metrics_test.go ) From b0c211a1e5b8e365ff4d21da80ff516c57c40294 Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Fri, 15 Jul 2022 14:46:32 +0000 Subject: [PATCH 03/10] Fix PL seat util metric retrieval --- .../flowcontrol/concurrency_util_test.go | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index f549e4c0d10..06f8e31a433 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -56,14 +56,14 @@ import ( ) const ( - requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" - requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" - requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" - priorityLevelSeatCountSamplesSumName = "apiserver_flowcontrol_priority_level_seat_count_samples_sum" - priorityLevelSeatCountSamplesCountName = "apiserver_flowcontrol_priority_level_seat_count_samples_count" - fakeworkDuration = 200 * time.Millisecond - testWarmUpTime = 2 * time.Second - testTime = 10 * time.Second + requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" + requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" + priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_request_count_samples_sum" + priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_request_count_samples_count" + fakeworkDuration = 200 * time.Millisecond + testWarmUpTime = 2 * time.Second + testTime = 10 * time.Second ) func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int, authz authorizer.Authorizer) (*rest.Config, framework.TearDownFunc) { @@ -185,21 +185,25 @@ func TestConcurrencyIsolation(t *testing.T) { noxu2ClientRequestLatencySum = 0 noxu2ClientRequestLatencySumSq = 0 noxu2Mutex.Unlock() - earlierRequestExecutionSecondsSum, earlierRequestExecutionSecondsCount, earlierPLSeatUtilSamplesSum, earlierPLSeatUtilSamplesCount, err := getRequestExecutionMetrics(loopbackClient) + earlierRequestExecutionSecondsSum, earlierRequestExecutionSecondsCount, earlierPLSeatUtilSum, earlierPLSeatUtilCount, err := getRequestExecutionMetrics(loopbackClient) if err != nil { t.Error(err) } time.Sleep(testTime) // after warming up, the test enters a steady state - laterRequestExecutionSecondsSum, laterRequestExecutionSecondsCount, laterPLSeatUtilSamplesSum, laterPLSeatUtilSamplesCount, err := getRequestExecutionMetrics(loopbackClient) + laterRequestExecutionSecondsSum, laterRequestExecutionSecondsCount, laterPLSeatUtilSum, laterPLSeatUtilCount, err := getRequestExecutionMetrics(loopbackClient) if err != nil { t.Error(err) } + if (earlierPLSeatUtilCount[priorityLevelNoxu1.Name] >= laterPLSeatUtilCount[priorityLevelNoxu1.Name]) || (earlierPLSeatUtilCount[priorityLevelNoxu2.Name] >= laterPLSeatUtilCount[priorityLevelNoxu2.Name]) { + t.Errorf("PLSeatUtilCount check failed: noxu1 earlier count %v, later count %v; noxu2 earlier count %v, later count %v", + earlierPLSeatUtilCount[priorityLevelNoxu1.Name], laterPLSeatUtilCount[priorityLevelNoxu1.Name], earlierPLSeatUtilCount[priorityLevelNoxu2.Name], laterPLSeatUtilCount[priorityLevelNoxu2.Name]) + } close(stopCh) noxu1RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu1.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu1.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu1.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu1.Name]) noxu2RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu2.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu2.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu2.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu2.Name]) - noxu1PLSeatUtilAvg := (laterPLSeatUtilSamplesSum[priorityLevelNoxu1.Name] - earlierPLSeatUtilSamplesSum[priorityLevelNoxu1.Name]) / float64(laterPLSeatUtilSamplesCount[priorityLevelNoxu1.Name]-earlierPLSeatUtilSamplesCount[priorityLevelNoxu1.Name]) - noxu2PLSeatUtilAvg := (laterPLSeatUtilSamplesSum[priorityLevelNoxu2.Name] - earlierPLSeatUtilSamplesSum[priorityLevelNoxu2.Name]) / float64(laterPLSeatUtilSamplesCount[priorityLevelNoxu2.Name]-earlierPLSeatUtilSamplesCount[priorityLevelNoxu2.Name]) + noxu1PLSeatUtilAvg := (laterPLSeatUtilSum[priorityLevelNoxu1.Name] - earlierPLSeatUtilSum[priorityLevelNoxu1.Name]) / float64(laterPLSeatUtilCount[priorityLevelNoxu1.Name]-earlierPLSeatUtilCount[priorityLevelNoxu1.Name]) + noxu2PLSeatUtilAvg := (laterPLSeatUtilSum[priorityLevelNoxu2.Name] - earlierPLSeatUtilSum[priorityLevelNoxu2.Name]) / float64(laterPLSeatUtilCount[priorityLevelNoxu2.Name]-earlierPLSeatUtilCount[priorityLevelNoxu2.Name]) t.Logf("\nnoxu1RequestExecutionSecondsAvg %v\nnoxu2RequestExecutionSecondsAvg %v", noxu1RequestExecutionSecondsAvg, noxu2RequestExecutionSecondsAvg) t.Logf("\nnoxu1PLSeatUtilAvg %v\nnoxu2PLSeatUtilAvg %v", noxu1PLSeatUtilAvg, noxu2PLSeatUtilAvg) @@ -306,8 +310,8 @@ func getRequestExecutionMetrics(c clientset.Interface) (map[string]float64, map[ RequestExecutionSecondsSum := make(map[string]float64) RequestExecutionSecondsCount := make(map[string]int) - PriorityLevelSeatCountSamplesSum := make(map[string]float64) - PriorityLevelSeatCountSamplesCount := make(map[string]int) + PLSeatUtilSum := make(map[string]float64) + PLSeatUtilCount := make(map[string]int) for { var v model.Vector @@ -315,7 +319,7 @@ func getRequestExecutionMetrics(c clientset.Interface) (map[string]float64, map[ if err == io.EOF { // Expected loop termination condition. return RequestExecutionSecondsSum, RequestExecutionSecondsCount, - PriorityLevelSeatCountSamplesSum, PriorityLevelSeatCountSamplesCount, nil + PLSeatUtilSum, PLSeatUtilCount, nil } return nil, nil, nil, nil, fmt.Errorf("failed decoding metrics: %v", err) } @@ -325,10 +329,10 @@ func getRequestExecutionMetrics(c clientset.Interface) (map[string]float64, map[ RequestExecutionSecondsSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) case requestExecutionSecondsCountName: RequestExecutionSecondsCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) - case priorityLevelSeatCountSamplesSumName: - PriorityLevelSeatCountSamplesSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) - case priorityLevelSeatCountSamplesCountName: - PriorityLevelSeatCountSamplesCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + case priorityLevelSeatUtilSumName: + PLSeatUtilSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) + case priorityLevelSeatUtilCountName: + PLSeatUtilCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) } } } @@ -351,6 +355,7 @@ func streamRequestsWithIndex(parallel int, request func(idx int), wg *sync.WaitG } } +// Webhook authorizer code copied from staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go with minor changes // V1Service mocks a remote service. type V1Service interface { Review(*authorizationv1.SubjectAccessReview) From c94df76190d37d2b8cf08ce221163252d808eead Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Fri, 15 Jul 2022 14:47:03 +0000 Subject: [PATCH 04/10] Update vendor list --- vendor/modules.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/vendor/modules.txt b/vendor/modules.txt index 6ead768df4a..202c44bbf23 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1562,6 +1562,7 @@ k8s.io/apiserver/pkg/admission/plugin/webhook/namespace k8s.io/apiserver/pkg/admission/plugin/webhook/object k8s.io/apiserver/pkg/admission/plugin/webhook/request k8s.io/apiserver/pkg/admission/plugin/webhook/rules +k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts k8s.io/apiserver/pkg/admission/plugin/webhook/validating k8s.io/apiserver/pkg/admission/testing k8s.io/apiserver/pkg/apis/apiserver From 4fc7fd25bf8f2f6831a79bd0c5ba19bb09bf7b7a Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Fri, 15 Jul 2022 18:27:29 +0000 Subject: [PATCH 05/10] Fix the apf metrics names --- .../apiserver/flowcontrol/concurrency_util_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index 06f8e31a433..5ff9d65464a 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -59,8 +59,8 @@ const ( requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" - priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_request_count_samples_sum" - priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_request_count_samples_count" + priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum" + priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_seat_utilization_count" fakeworkDuration = 200 * time.Millisecond testWarmUpTime = 2 * time.Second testTime = 10 * time.Second From ed74de833dcbd1ac6b295a7ac525efe7f6c1152f Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Sat, 30 Jul 2022 00:13:48 +0000 Subject: [PATCH 06/10] Add comment for the reason this test is needed --- .../apiserver/flowcontrol/concurrency_util_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index 5ff9d65464a..2ca44a24f74 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -81,6 +81,16 @@ func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingR return kubeConfig, tearDownFn } +// This integration test checks the client-side expected concurrency and the server-side observed concurrency +// to make sure that they are close within a small error bound and that the priority levels are isolated. +// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead +// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially +// increase request execution time to make the system noise relatively insignificant. +// This test calculates the server-side observed concurrency from average priority level seat utilization APF metric. +// It also assumes that +// (server-side request execution throughput) == (client-side request throughput) and derives a formula to +// calculate the client-side expected concurrency. The two are compared and a small error bound is determined +// from estimating the noise using 2*(standard deviation of requenst latency)/(avg request latency). func TestConcurrencyIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test From 07e2bfe1cc09ff23e0de8f1a5488b83a01469d4a Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Tue, 2 Aug 2022 14:29:03 +0000 Subject: [PATCH 07/10] Refactor and clean up the code --- .../flowcontrol/concurrency_util_test.go | 294 +++++++++--------- 1 file changed, 147 insertions(+), 147 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index 2ca44a24f74..f64d838c260 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -81,6 +81,72 @@ func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingR return kubeConfig, tearDownFn } +type SumAndCount struct { + Sum float64 + Count int +} + +type plMetrics struct { + execSeconds SumAndCount + seatUtil SumAndCount + availableSeats int +} + +// metricSnapshot maps from a priority level label to +// a plMetrics struct containing APF metrics of interest +type metricSnapshot map[string]plMetrics + +// Client request latency measurement +type clientLatencyMeasurement struct { + SumAndCount + SumSq float64 // latency sum of squares + Mu sync.Mutex +} + +func (clm *clientLatencyMeasurement) reset() { + clm.Mu.Lock() + clm.Sum = 0 + clm.Count = 0 + clm.SumSq = 0 + clm.Mu.Unlock() +} + +func (clm *clientLatencyMeasurement) update(duration float64) { + clm.Mu.Lock() + clm.Count += 1 + clm.Sum += duration + clm.SumSq += duration * duration + clm.Mu.Unlock() +} + +func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { + mean := clm.Sum / float64(clm.Count) + ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean + stdDev := math.Sqrt(ss / float64(clm.Count)) + cv := stdDev / mean + return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv} +} + +type clientLatencyStats struct { + mean float64 // latency average + stdDev float64 // latency standard deviation + cv float64 // latency coefficient of variation +} + +type plMetricAvg struct { + reqExecution float64 // average request execution time + seatUtil float64 // average seat utilization +} + +func intervalMetricAvg(snapshots map[string]metricSnapshot, t0 string, t1 string, plLabel string) plMetricAvg { + plmT0 := snapshots[t0][plLabel] + plmT1 := snapshots[t1][plLabel] + return plMetricAvg{ + reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count), + seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count), + } +} + // This integration test checks the client-side expected concurrency and the server-side observed concurrency // to make sure that they are close within a small error bound and that the priority levels are isolated. // This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead @@ -117,65 +183,42 @@ func TestConcurrencyIsolation(t *testing.T) { queueLength := 50 concurrencyShares := 100 - priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + plNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu1", concurrencyShares, queueLength) if err != nil { t.Error(err) } - priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + plNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( loopbackClient, "noxu2", concurrencyShares, queueLength) if err != nil { t.Error(err) } - availableSeats, err := getAvailableSeatsOfPriorityLevel(loopbackClient) - if err != nil { - t.Error(err) - } - - t.Logf("noxu1 priority level concurrency limit: %v", availableSeats[priorityLevelNoxu1.Name]) - t.Logf("noxu2 priority level concurrency limit: %v", availableSeats[priorityLevelNoxu2.Name]) - if (availableSeats[priorityLevelNoxu1.Name] <= 4) || (availableSeats[priorityLevelNoxu2.Name] <= 4) { - t.Errorf("The number of available seats for test client priority levels are too small: (%v, %v). Expecting a number > 4", - availableSeats[priorityLevelNoxu1.Name], availableSeats[priorityLevelNoxu2.Name]) - } stopCh := make(chan struct{}) wg := sync.WaitGroup{} // "elephant" noxu1NumGoroutines := 5 + queueLength - var noxu1ClientRequestLatencySum float64 - var noxu1ClientRequestLatencySumSq float64 - var noxu1ClientRequestLatencyCount int32 - var noxu1Mutex sync.Mutex - streamRequestsWithIndex(noxu1NumGoroutines, func(idx int) { + var noxu1LatMeasure clientLatencyMeasurement + wg.Add(noxu1NumGoroutines) + streamRequests(noxu1NumGoroutines, func() { start := time.Now() _, err := noxu1Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) duration := time.Since(start).Seconds() - noxu1Mutex.Lock() - noxu1ClientRequestLatencyCount += 1 - noxu1ClientRequestLatencySum += duration - noxu1ClientRequestLatencySumSq += duration * duration - noxu1Mutex.Unlock() + noxu1LatMeasure.update(duration) if err != nil { t.Error(err) } }, &wg, stopCh) // "mouse" noxu2NumGoroutines := 3 - var noxu2ClientRequestLatencySum float64 - var noxu2ClientRequestLatencySumSq float64 - var noxu2ClientRequestLatencyCount int32 - var noxu2Mutex sync.Mutex - streamRequestsWithIndex(noxu2NumGoroutines, func(idx int) { + var noxu2LatMeasure clientLatencyMeasurement + wg.Add(noxu2NumGoroutines) + streamRequests(noxu2NumGoroutines, func() { start := time.Now() _, err := noxu2Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) duration := time.Since(start).Seconds() - noxu2Mutex.Lock() - noxu2ClientRequestLatencyCount += 1 - noxu2ClientRequestLatencySum += duration - noxu2ClientRequestLatencySumSq += duration * duration - noxu2Mutex.Unlock() + noxu2LatMeasure.update(duration) if err != nil { t.Error(err) } @@ -184,97 +227,101 @@ func TestConcurrencyIsolation(t *testing.T) { // Warm up time.Sleep(testWarmUpTime) - // Reset counters - noxu1Mutex.Lock() - noxu1ClientRequestLatencyCount = 0 - noxu1ClientRequestLatencySum = 0 - noxu1ClientRequestLatencySumSq = 0 - noxu1Mutex.Unlock() - noxu2Mutex.Lock() - noxu2ClientRequestLatencyCount = 0 - noxu2ClientRequestLatencySum = 0 - noxu2ClientRequestLatencySumSq = 0 - noxu2Mutex.Unlock() - earlierRequestExecutionSecondsSum, earlierRequestExecutionSecondsCount, earlierPLSeatUtilSum, earlierPLSeatUtilCount, err := getRequestExecutionMetrics(loopbackClient) + noxu1LatMeasure.reset() + noxu2LatMeasure.reset() + // Snapshots maps from a time label to a metricSnapshot + snapshots := make(map[string]metricSnapshot) + snapshots["t0"], err = getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } time.Sleep(testTime) // after warming up, the test enters a steady state - laterRequestExecutionSecondsSum, laterRequestExecutionSecondsCount, laterPLSeatUtilSum, laterPLSeatUtilCount, err := getRequestExecutionMetrics(loopbackClient) + snapshots["t1"], err = getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } - if (earlierPLSeatUtilCount[priorityLevelNoxu1.Name] >= laterPLSeatUtilCount[priorityLevelNoxu1.Name]) || (earlierPLSeatUtilCount[priorityLevelNoxu2.Name] >= laterPLSeatUtilCount[priorityLevelNoxu2.Name]) { - t.Errorf("PLSeatUtilCount check failed: noxu1 earlier count %v, later count %v; noxu2 earlier count %v, later count %v", - earlierPLSeatUtilCount[priorityLevelNoxu1.Name], laterPLSeatUtilCount[priorityLevelNoxu1.Name], earlierPLSeatUtilCount[priorityLevelNoxu2.Name], laterPLSeatUtilCount[priorityLevelNoxu2.Name]) - } close(stopCh) - noxu1RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu1.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu1.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu1.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu1.Name]) - noxu2RequestExecutionSecondsAvg := (laterRequestExecutionSecondsSum[priorityLevelNoxu2.Name] - earlierRequestExecutionSecondsSum[priorityLevelNoxu2.Name]) / float64(laterRequestExecutionSecondsCount[priorityLevelNoxu2.Name]-earlierRequestExecutionSecondsCount[priorityLevelNoxu2.Name]) - noxu1PLSeatUtilAvg := (laterPLSeatUtilSum[priorityLevelNoxu1.Name] - earlierPLSeatUtilSum[priorityLevelNoxu1.Name]) / float64(laterPLSeatUtilCount[priorityLevelNoxu1.Name]-earlierPLSeatUtilCount[priorityLevelNoxu1.Name]) - noxu2PLSeatUtilAvg := (laterPLSeatUtilSum[priorityLevelNoxu2.Name] - earlierPLSeatUtilSum[priorityLevelNoxu2.Name]) / float64(laterPLSeatUtilCount[priorityLevelNoxu2.Name]-earlierPLSeatUtilCount[priorityLevelNoxu2.Name]) - t.Logf("\nnoxu1RequestExecutionSecondsAvg %v\nnoxu2RequestExecutionSecondsAvg %v", noxu1RequestExecutionSecondsAvg, noxu2RequestExecutionSecondsAvg) - t.Logf("\nnoxu1PLSeatUtilAvg %v\nnoxu2PLSeatUtilAvg %v", noxu1PLSeatUtilAvg, noxu2PLSeatUtilAvg) - - wg.Wait() // wait till the client goroutines finish before computing the statistics - noxu1ClientRequestLatencySecondsAvg, noxu1ClientRequestLatencySecondsSdev := computeClientRequestLatencyStats(noxu1ClientRequestLatencyCount, noxu1ClientRequestLatencySum, noxu1ClientRequestLatencySumSq) - noxu2ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev := computeClientRequestLatencyStats(noxu2ClientRequestLatencyCount, noxu2ClientRequestLatencySum, noxu2ClientRequestLatencySumSq) - t.Logf("\nnoxu1ClientRequestLatencyCount %v\nnoxu2ClientRequestLatencyCount %v", noxu1ClientRequestLatencyCount, noxu2ClientRequestLatencyCount) - t.Logf("\nnoxu1ClientRequestLatencySecondsAvg %v\nnoxu2ClientRequestLatencySecondsAvg %v", noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsAvg) - t.Logf("\nnoxu1ClientRequestLatencySecondsSdev %v\nnoxu2ClientRequestLatencySecondsSdev %v", noxu1ClientRequestLatencySecondsSdev, noxu2ClientRequestLatencySecondsSdev) - allDispatchedReqCounts, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + // Check the assumptions of the test + noxu1T0 := snapshots["t0"][plNoxu1.Name] + noxu1T1 := snapshots["t1"][plNoxu1.Name] + noxu2T0 := snapshots["t0"][plNoxu2.Name] + noxu2T1 := snapshots["t1"][plNoxu2.Name] + if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count { + t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d", + noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count) + } + t.Logf("noxu1 priority level concurrency limit: %d", noxu1T0.availableSeats) + t.Logf("noxu2 priority level concurrency limit: %d", noxu2T0.availableSeats) + if (noxu1T0.availableSeats != noxu1T1.availableSeats) || (noxu2T0.availableSeats != noxu2T1.availableSeats) { + t.Errorf("The number of available seats changed: noxu1 (%d, %d) noxu2 (%d, %d)", + noxu1T0.availableSeats, noxu1T1.availableSeats, noxu2T0.availableSeats, noxu2T1.availableSeats) + } + if (noxu1T0.availableSeats <= 4) || (noxu2T0.availableSeats <= 4) { + t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4", + noxu1T0.availableSeats, noxu2T0.availableSeats) + } + // No reuqests should be rejected under normal situations + _, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) if err != nil { t.Error(err) } - t.Logf("\nnoxu1APFRequestCount %v\nnoxu2APFRequestCount %v", allDispatchedReqCounts[priorityLevelNoxu1.Name], allDispatchedReqCounts[priorityLevelNoxu2.Name]) - if rejectedReqCounts[priorityLevelNoxu1.Name] > 0 { - t.Errorf(`%v requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name]) + if rejectedReqCounts[plNoxu1.Name] > 0 { + t.Errorf(`%d requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu1.Name]) } - if rejectedReqCounts[priorityLevelNoxu2.Name] > 0 { - t.Errorf(`%v requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name]) + if rejectedReqCounts[plNoxu2.Name] > 0 { + t.Errorf(`%d requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu2.Name]) } + // Calculate APF server side metric averages during the test interval + noxu1Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu1.Name) + noxu2Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu2.Name) + t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution) + t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil) + + // Wait till the client goroutines finish before computing the client side request latency statistics + wg.Wait() + noxu1LatStats := noxu1LatMeasure.getStats() + noxu2LatStats := noxu2LatMeasure.getStats() + t.Logf("noxu1 client request count %d duration mean %v stddev %v cv %v", noxu1LatMeasure.Count, noxu1LatStats.mean, noxu1LatStats.stdDev, noxu1LatStats.cv) + t.Logf("noxu2 client request count %d duration mean %v stddev %v cv %v", noxu2LatMeasure.Count, noxu2LatStats.mean, noxu2LatStats.stdDev, noxu2LatStats.cv) + // Calculate server-side observed concurrency - noxu1ObservedConcurrency := noxu1PLSeatUtilAvg * float64(availableSeats[priorityLevelNoxu1.Name]) - noxu2ObservedConcurrency := noxu2PLSeatUtilAvg * float64(availableSeats[priorityLevelNoxu2.Name]) + noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats) + noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats) // Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side // Expected concurrency computed can sometimes be larger than the number of available seats. We use the number of available seats as an upper bound - noxu1ExpectedConcurrency := math.Min(float64(noxu1NumGoroutines)*noxu1RequestExecutionSecondsAvg/noxu1ClientRequestLatencySecondsAvg, float64(availableSeats[priorityLevelNoxu1.Name])) - noxu2ExpectedConcurrency := math.Min(float64(noxu2NumGoroutines)*noxu2RequestExecutionSecondsAvg/noxu2ClientRequestLatencySecondsAvg, float64(availableSeats[priorityLevelNoxu2.Name])) + noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean + noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency) // Calculate the tolerable error margin and perform the final check - margin := 2 * math.Min(noxu1ClientRequestLatencySecondsSdev/noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev/noxu2ClientRequestLatencySecondsAvg) - t.Logf("\nnoxu1Margin %v\nnoxu2Margin %v", noxu1ClientRequestLatencySecondsSdev/noxu1ClientRequestLatencySecondsAvg, noxu2ClientRequestLatencySecondsSdev/noxu2ClientRequestLatencySecondsAvg) + margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv) t.Logf("Error margin is %v", margin) isConcurrencyExpected := func(name string, observed float64, expected float64) bool { t.Logf("%v relative error is %v", name, math.Abs(expected-observed)/expected) return math.Abs(expected-observed)/expected <= margin } - if !isConcurrencyExpected(priorityLevelNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) { + if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) { t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency) } - if !isConcurrencyExpected(priorityLevelNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) { + if !isConcurrencyExpected(plNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) { t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency) } // Check server-side APF measurements - if math.Abs(1-noxu1PLSeatUtilAvg) > 0.05 { - t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1PLSeatUtilAvg) + // For the elephant noxu1, the avg seat utilization should be close to 1.0 + if math.Abs(1-noxu1Avg.seatUtil) > 0.05 { + t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1Avg.seatUtil) } + // For the mouse noxu2, the observed concurrency should be close to the number of goroutines it uses if math.Abs(1-noxu2ObservedConcurrency/float64(noxu2NumGoroutines)) > 0.05 { - t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%v", noxu2ObservedConcurrency, noxu2NumGoroutines) + t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%d", noxu2ObservedConcurrency, noxu2NumGoroutines) } } -func computeClientRequestLatencyStats(count int32, sum, sumsq float64) (float64, float64) { - mean := sum / float64(count) - ss := sumsq - mean*sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean - return mean, math.Sqrt(ss / float64(count)) -} +func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) { -func getAvailableSeatsOfPriorityLevel(c clientset.Interface) (map[string]int, error) { resp, err := getMetrics(c) if err != nil { return nil, err @@ -286,85 +333,40 @@ func getAvailableSeatsOfPriorityLevel(c clientset.Interface) (map[string]int, er Opts: &expfmt.DecodeOptions{}, } - concurrency := make(map[string]int) + snapshot := metricSnapshot{} + for { var v model.Vector if err := decoder.Decode(&v); err != nil { if err == io.EOF { // Expected loop termination condition. - return concurrency, nil + return snapshot, nil } return nil, fmt.Errorf("failed decoding metrics: %v", err) } for _, metric := range v { - switch name := string(metric.Metric[model.MetricNameLabel]); name { - case requestConcurrencyLimitMetricsName: - concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + plLabel := string(metric.Metric[labelPriorityLevel]) + entry := plMetrics{} + if v, ok := snapshot[plLabel]; ok { + entry = v } - } - } -} - -func getRequestExecutionMetrics(c clientset.Interface) (map[string]float64, map[string]int, map[string]float64, map[string]int, error) { - - resp, err := getMetrics(c) - if err != nil { - return nil, nil, nil, nil, err - } - - dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) - decoder := expfmt.SampleDecoder{ - Dec: dec, - Opts: &expfmt.DecodeOptions{}, - } - - RequestExecutionSecondsSum := make(map[string]float64) - RequestExecutionSecondsCount := make(map[string]int) - PLSeatUtilSum := make(map[string]float64) - PLSeatUtilCount := make(map[string]int) - - for { - var v model.Vector - if err := decoder.Decode(&v); err != nil { - if err == io.EOF { - // Expected loop termination condition. - return RequestExecutionSecondsSum, RequestExecutionSecondsCount, - PLSeatUtilSum, PLSeatUtilCount, nil - } - return nil, nil, nil, nil, fmt.Errorf("failed decoding metrics: %v", err) - } - for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { case requestExecutionSecondsSumName: - RequestExecutionSecondsSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) + entry.execSeconds.Sum = float64(metric.Value) case requestExecutionSecondsCountName: - RequestExecutionSecondsCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + entry.execSeconds.Count = int(metric.Value) case priorityLevelSeatUtilSumName: - PLSeatUtilSum[string(metric.Metric[labelPriorityLevel])] = float64(metric.Value) + entry.seatUtil.Sum = float64(metric.Value) case priorityLevelSeatUtilCountName: - PLSeatUtilCount[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) + entry.seatUtil.Count = int(metric.Value) + case requestConcurrencyLimitMetricsName: + entry.availableSeats = int(metric.Value) } + snapshot[plLabel] = entry } } } -func streamRequestsWithIndex(parallel int, request func(idx int), wg *sync.WaitGroup, stopCh <-chan struct{}) { - wg.Add(parallel) - for i := 0; i < parallel; i++ { - go func(idx int) { - defer wg.Done() - for { - select { - case <-stopCh: - return - default: - request(idx) - } - } - }(i) - } -} - // Webhook authorizer code copied from staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go with minor changes // V1Service mocks a remote service. type V1Service interface { @@ -454,14 +456,12 @@ func NewV1TestServer(s V1Service, cert, key, caCert []byte) (*httptest.Server, e type mockV1Service struct { allow bool statusCode int - called int } func (m *mockV1Service) Review(r *authorizationv1.SubjectAccessReview) { if r.Spec.User == "noxu1" || r.Spec.User == "noxu2" { time.Sleep(fakeworkDuration) // simulate fake work with sleep } - m.called++ r.Status.Allowed = m.allow } func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } From c70ec593ec95f464cf6a2fbf5a03fffdb4ef5acd Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Wed, 3 Aug 2022 14:14:36 +0000 Subject: [PATCH 08/10] Fix minor issues and clean up --- .../flowcontrol/concurrency_util_test.go | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index f64d838c260..f766aab976b 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -105,23 +105,26 @@ type clientLatencyMeasurement struct { func (clm *clientLatencyMeasurement) reset() { clm.Mu.Lock() + defer clm.Mu.Unlock() clm.Sum = 0 clm.Count = 0 clm.SumSq = 0 - clm.Mu.Unlock() } func (clm *clientLatencyMeasurement) update(duration float64) { clm.Mu.Lock() + defer clm.Mu.Unlock() clm.Count += 1 clm.Sum += duration clm.SumSq += duration * duration - clm.Mu.Unlock() } func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { + clm.Mu.Lock() + defer clm.Mu.Unlock() mean := clm.Sum / float64(clm.Count) ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean + stdDev := math.Sqrt(ss / float64(clm.Count)) cv := stdDev / mean return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv} @@ -129,7 +132,7 @@ func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { type clientLatencyStats struct { mean float64 // latency average - stdDev float64 // latency standard deviation + stdDev float64 // latency population standard deviation cv float64 // latency coefficient of variation } @@ -138,25 +141,21 @@ type plMetricAvg struct { seatUtil float64 // average seat utilization } -func intervalMetricAvg(snapshots map[string]metricSnapshot, t0 string, t1 string, plLabel string) plMetricAvg { - plmT0 := snapshots[t0][plLabel] - plmT1 := snapshots[t1][plLabel] +func intervalMetricAvg(snapshot0 metricSnapshot, snapshot1 metricSnapshot, plLabel string) plMetricAvg { + plmT0 := snapshot0[plLabel] + plmT1 := snapshot1[plLabel] return plMetricAvg{ reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count), seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count), } } -// This integration test checks the client-side expected concurrency and the server-side observed concurrency -// to make sure that they are close within a small error bound and that the priority levels are isolated. -// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead -// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially -// increase request execution time to make the system noise relatively insignificant. -// This test calculates the server-side observed concurrency from average priority level seat utilization APF metric. -// It also assumes that -// (server-side request execution throughput) == (client-side request throughput) and derives a formula to -// calculate the client-side expected concurrency. The two are compared and a small error bound is determined -// from estimating the noise using 2*(standard deviation of requenst latency)/(avg request latency). +// TestConcurrencyIsolation tests the concurrency isolation between priority levels. +// The test defines two priority levels for this purpose, and corresponding flow schemas. +// To one priority level, this test sends many more concurrent requests than the configuration +// allows to execute at once, while sending fewer than allowed to the other priority level. +// The primary check is that the low flow gets all the seats it wants, but is modulated by +// recognizing that there are uncontrolled overheads in the system. func TestConcurrencyIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test @@ -229,24 +228,22 @@ func TestConcurrencyIsolation(t *testing.T) { noxu1LatMeasure.reset() noxu2LatMeasure.reset() - // Snapshots maps from a time label to a metricSnapshot - snapshots := make(map[string]metricSnapshot) - snapshots["t0"], err = getRequestMetricsSnapshot(loopbackClient) + snapshot0, err := getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } time.Sleep(testTime) // after warming up, the test enters a steady state - snapshots["t1"], err = getRequestMetricsSnapshot(loopbackClient) + snapshot1, err := getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } close(stopCh) // Check the assumptions of the test - noxu1T0 := snapshots["t0"][plNoxu1.Name] - noxu1T1 := snapshots["t1"][plNoxu1.Name] - noxu2T0 := snapshots["t0"][plNoxu2.Name] - noxu2T1 := snapshots["t1"][plNoxu2.Name] + noxu1T0 := snapshot0[plNoxu1.Name] + noxu1T1 := snapshot1[plNoxu1.Name] + noxu2T0 := snapshot0[plNoxu2.Name] + noxu2T1 := snapshot1[plNoxu2.Name] if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count { t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d", noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count) @@ -261,7 +258,7 @@ func TestConcurrencyIsolation(t *testing.T) { t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4", noxu1T0.availableSeats, noxu2T0.availableSeats) } - // No reuqests should be rejected under normal situations + // No requests should be rejected under normal situations _, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) if err != nil { t.Error(err) @@ -274,8 +271,8 @@ func TestConcurrencyIsolation(t *testing.T) { } // Calculate APF server side metric averages during the test interval - noxu1Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu1.Name) - noxu2Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu2.Name) + noxu1Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu1.Name) + noxu2Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu2.Name) t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution) t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil) @@ -290,11 +287,16 @@ func TestConcurrencyIsolation(t *testing.T) { noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats) noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats) // Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side - // Expected concurrency computed can sometimes be larger than the number of available seats. We use the number of available seats as an upper bound noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency) - // Calculate the tolerable error margin and perform the final check + + // There are uncontrolled overheads that introduce noise into the system. The coefficient of variation (CV), that is, + // standard deviation divided by mean, for a class of traffic is a characterization of all the noise that applied to + // that class. We found that noxu1 generally had a much bigger CV than noxu2. This makes sense, because noxu1 probes + // more behavior --- the waiting in queues. So we take the minimum of the two as an indicator of the relative amount + // of noise that comes from all the other behavior. Currently, we use 2 times the experienced coefficient of variation + // as the margin of error. margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv) t.Logf("Error margin is %v", margin) @@ -309,14 +311,13 @@ func TestConcurrencyIsolation(t *testing.T) { t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency) } - // Check server-side APF measurements - // For the elephant noxu1, the avg seat utilization should be close to 1.0 + // Check the server-side APF seat utilization measurements if math.Abs(1-noxu1Avg.seatUtil) > 0.05 { - t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1Avg.seatUtil) + t.Errorf("noxu1Avg.seatUtil=%v is too far from expected=1.0", noxu1Avg.seatUtil) } - // For the mouse noxu2, the observed concurrency should be close to the number of goroutines it uses - if math.Abs(1-noxu2ObservedConcurrency/float64(noxu2NumGoroutines)) > 0.05 { - t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%d", noxu2ObservedConcurrency, noxu2NumGoroutines) + noxu2ExpectedSeatUtil := float64(noxu2NumGoroutines) / float64(noxu2T0.availableSeats) + if math.Abs(noxu2ExpectedSeatUtil-noxu2Avg.seatUtil) > 0.05 { + t.Errorf("noxu2Avg.seatUtil=%v is too far from expected=%v", noxu2Avg.seatUtil, noxu2ExpectedSeatUtil) } } From 3c31efe32f68db78ba723ea9e169fe6ab4c4632c Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Wed, 3 Aug 2022 15:16:26 +0000 Subject: [PATCH 09/10] Add more comment and clip negative value to prevent Sqrt error --- .../flowcontrol/concurrency_util_test.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index f766aab976b..fabba1d17b0 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -124,7 +124,10 @@ func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { defer clm.Mu.Unlock() mean := clm.Sum / float64(clm.Count) ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean - + // Set ss to 0 if negative value is resulted from floating point calculations + if ss < 0 { + ss = 0 + } stdDev := math.Sqrt(ss / float64(clm.Count)) cv := stdDev / mean return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv} @@ -141,7 +144,7 @@ type plMetricAvg struct { seatUtil float64 // average seat utilization } -func intervalMetricAvg(snapshot0 metricSnapshot, snapshot1 metricSnapshot, plLabel string) plMetricAvg { +func intervalMetricAvg(snapshot0, snapshot1 metricSnapshot, plLabel string) plMetricAvg { plmT0 := snapshot0[plLabel] plmT1 := snapshot1[plLabel] return plMetricAvg{ @@ -156,6 +159,17 @@ func intervalMetricAvg(snapshot0 metricSnapshot, snapshot1 metricSnapshot, plLab // allows to execute at once, while sending fewer than allowed to the other priority level. // The primary check is that the low flow gets all the seats it wants, but is modulated by // recognizing that there are uncontrolled overheads in the system. +// +// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead +// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially +// increase request execution time to make the system noise relatively insignificant. +// +// Secondarily, this test also checks the observed seat utilizations against values derived from expecting that +// the throughput observed by the client equals the execution throughput observed by the server. +// +// This test recognizes that there is noise in the measurements coming from uncontrolled overheads +// and unsynchronized reading of related quantities. This test takes as a relative error margin 2 times +// the smaller (of the two traffic classes) coefficient of variation of the client-observed latency. func TestConcurrencyIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test From 94097457fd71d32c29076b16f2b2c71b478f6f49 Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Tue, 9 Aug 2022 12:19:00 +0000 Subject: [PATCH 10/10] Minor fixes --- .../apiserver/flowcontrol/concurrency_util_test.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index fabba1d17b0..0806fbe88c2 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -163,13 +163,8 @@ func intervalMetricAvg(snapshot0, snapshot1 metricSnapshot, plLabel string) plMe // This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead // of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially // increase request execution time to make the system noise relatively insignificant. -// // Secondarily, this test also checks the observed seat utilizations against values derived from expecting that // the throughput observed by the client equals the execution throughput observed by the server. -// -// This test recognizes that there is noise in the measurements coming from uncontrolled overheads -// and unsynchronized reading of related quantities. This test takes as a relative error margin 2 times -// the smaller (of the two traffic classes) coefficient of variation of the client-observed latency. func TestConcurrencyIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test @@ -315,8 +310,9 @@ func TestConcurrencyIsolation(t *testing.T) { t.Logf("Error margin is %v", margin) isConcurrencyExpected := func(name string, observed float64, expected float64) bool { - t.Logf("%v relative error is %v", name, math.Abs(expected-observed)/expected) - return math.Abs(expected-observed)/expected <= margin + relativeErr := math.Abs(expected-observed) / expected + t.Logf("%v relative error is %v", name, relativeErr) + return relativeErr <= margin } if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) { t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency)