diff --git a/hack/verify-prometheus-imports.sh b/hack/verify-prometheus-imports.sh index 3572a3d905f..b29f8949f90 100755 --- a/hack/verify-prometheus-imports.sh +++ b/hack/verify-prometheus-imports.sh @@ -74,6 +74,7 @@ allowed_prometheus_importers=( ./test/e2e_node/resource_metrics_test.go ./test/instrumentation/main_test.go ./test/integration/apiserver/flowcontrol/concurrency_test.go + ./test/integration/apiserver/flowcontrol/concurrency_util_test.go ./test/integration/metrics/metrics_test.go ) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go new file mode 100644 index 00000000000..0806fbe88c2 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -0,0 +1,517 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + authorizationv1 "k8s.io/api/authorization/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts" + "k8s.io/apiserver/pkg/authorization/authorizer" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + webhookutil "k8s.io/apiserver/pkg/util/webhook" + "k8s.io/apiserver/plugin/pkg/authorizer/webhook" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + v1 "k8s.io/client-go/tools/clientcmd/api/v1" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" + requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" + priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum" + priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_seat_utilization_count" + fakeworkDuration = 200 * time.Millisecond + testWarmUpTime = 2 * time.Second + testTime = 10 * time.Second +) + +func setupWithAuthorizer(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int, authz authorizer.Authorizer) (*rest.Config, framework.TearDownFunc) { + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Ensure all clients are allowed to send requests. + opts.Authorization.Modes = []string{"AlwaysAllow"} + opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight + opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = maxMutatingRequestsInFlight + }, + ModifyServerConfig: func(config *controlplane.Config) { + config.GenericConfig.Authorization.Authorizer = authz + }, + }) + return kubeConfig, tearDownFn +} + +type SumAndCount struct { + Sum float64 + Count int +} + +type plMetrics struct { + execSeconds SumAndCount + seatUtil SumAndCount + availableSeats int +} + +// metricSnapshot maps from a priority level label to +// a plMetrics struct containing APF metrics of interest +type metricSnapshot map[string]plMetrics + +// Client request latency measurement +type clientLatencyMeasurement struct { + SumAndCount + SumSq float64 // latency sum of squares + Mu sync.Mutex +} + +func (clm *clientLatencyMeasurement) reset() { + clm.Mu.Lock() + defer clm.Mu.Unlock() + clm.Sum = 0 + clm.Count = 0 + clm.SumSq = 0 +} + +func (clm *clientLatencyMeasurement) update(duration float64) { + clm.Mu.Lock() + defer clm.Mu.Unlock() + clm.Count += 1 + clm.Sum += duration + clm.SumSq += duration * duration +} + +func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { + clm.Mu.Lock() + defer clm.Mu.Unlock() + mean := clm.Sum / float64(clm.Count) + ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean + // Set ss to 0 if negative value is resulted from floating point calculations + if ss < 0 { + ss = 0 + } + stdDev := math.Sqrt(ss / float64(clm.Count)) + cv := stdDev / mean + return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv} +} + +type clientLatencyStats struct { + mean float64 // latency average + stdDev float64 // latency population standard deviation + cv float64 // latency coefficient of variation +} + +type plMetricAvg struct { + reqExecution float64 // average request execution time + seatUtil float64 // average seat utilization +} + +func intervalMetricAvg(snapshot0, snapshot1 metricSnapshot, plLabel string) plMetricAvg { + plmT0 := snapshot0[plLabel] + plmT1 := snapshot1[plLabel] + return plMetricAvg{ + reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count), + seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count), + } +} + +// TestConcurrencyIsolation tests the concurrency isolation between priority levels. +// The test defines two priority levels for this purpose, and corresponding flow schemas. +// To one priority level, this test sends many more concurrent requests than the configuration +// allows to execute at once, while sending fewer than allowed to the other priority level. +// The primary check is that the low flow gets all the seats it wants, but is modulated by +// recognizing that there are uncontrolled overheads in the system. +// +// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead +// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially +// increase request execution time to make the system noise relatively insignificant. +// Secondarily, this test also checks the observed seat utilizations against values derived from expecting that +// the throughput observed by the client equals the execution throughput observed by the server. +func TestConcurrencyIsolation(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() + // NOTE: disabling the feature should fail the test + // start webhook server + serv := &mockV1Service{allow: true, statusCode: 200} + s, err := NewV1TestServer(serv, testcerts.ServerCert, testcerts.ServerKey, testcerts.CACert) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + authorizer, err := newV1Authorizer(s.URL, testcerts.ClientCert, testcerts.ClientKey, testcerts.CACert, 0) + if err != nil { + t.Fatal(err) + } + + kubeConfig, closeFn := setupWithAuthorizer(t, 10, 10, authorizer) + defer closeFn() + + loopbackClient := clientset.NewForConfigOrDie(kubeConfig) + noxu1Client := getClientFor(kubeConfig, "noxu1") + noxu2Client := getClientFor(kubeConfig, "noxu2") + + queueLength := 50 + concurrencyShares := 100 + + plNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu1", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + plNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu2", concurrencyShares, queueLength) + if err != nil { + t.Error(err) + } + + stopCh := make(chan struct{}) + wg := sync.WaitGroup{} + + // "elephant" + noxu1NumGoroutines := 5 + queueLength + var noxu1LatMeasure clientLatencyMeasurement + wg.Add(noxu1NumGoroutines) + streamRequests(noxu1NumGoroutines, func() { + start := time.Now() + _, err := noxu1Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) + duration := time.Since(start).Seconds() + noxu1LatMeasure.update(duration) + if err != nil { + t.Error(err) + } + }, &wg, stopCh) + // "mouse" + noxu2NumGoroutines := 3 + var noxu2LatMeasure clientLatencyMeasurement + wg.Add(noxu2NumGoroutines) + streamRequests(noxu2NumGoroutines, func() { + start := time.Now() + _, err := noxu2Client.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) + duration := time.Since(start).Seconds() + noxu2LatMeasure.update(duration) + if err != nil { + t.Error(err) + } + }, &wg, stopCh) + + // Warm up + time.Sleep(testWarmUpTime) + + noxu1LatMeasure.reset() + noxu2LatMeasure.reset() + snapshot0, err := getRequestMetricsSnapshot(loopbackClient) + if err != nil { + t.Error(err) + } + time.Sleep(testTime) // after warming up, the test enters a steady state + snapshot1, err := getRequestMetricsSnapshot(loopbackClient) + if err != nil { + t.Error(err) + } + close(stopCh) + + // Check the assumptions of the test + noxu1T0 := snapshot0[plNoxu1.Name] + noxu1T1 := snapshot1[plNoxu1.Name] + noxu2T0 := snapshot0[plNoxu2.Name] + noxu2T1 := snapshot1[plNoxu2.Name] + if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count { + t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d", + noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count) + } + t.Logf("noxu1 priority level concurrency limit: %d", noxu1T0.availableSeats) + t.Logf("noxu2 priority level concurrency limit: %d", noxu2T0.availableSeats) + if (noxu1T0.availableSeats != noxu1T1.availableSeats) || (noxu2T0.availableSeats != noxu2T1.availableSeats) { + t.Errorf("The number of available seats changed: noxu1 (%d, %d) noxu2 (%d, %d)", + noxu1T0.availableSeats, noxu1T1.availableSeats, noxu2T0.availableSeats, noxu2T1.availableSeats) + } + if (noxu1T0.availableSeats <= 4) || (noxu2T0.availableSeats <= 4) { + t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4", + noxu1T0.availableSeats, noxu2T0.availableSeats) + } + // No requests should be rejected under normal situations + _, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) + if err != nil { + t.Error(err) + } + if rejectedReqCounts[plNoxu1.Name] > 0 { + t.Errorf(`%d requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu1.Name]) + } + if rejectedReqCounts[plNoxu2.Name] > 0 { + t.Errorf(`%d requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu2.Name]) + } + + // Calculate APF server side metric averages during the test interval + noxu1Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu1.Name) + noxu2Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu2.Name) + t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution) + t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil) + + // Wait till the client goroutines finish before computing the client side request latency statistics + wg.Wait() + noxu1LatStats := noxu1LatMeasure.getStats() + noxu2LatStats := noxu2LatMeasure.getStats() + t.Logf("noxu1 client request count %d duration mean %v stddev %v cv %v", noxu1LatMeasure.Count, noxu1LatStats.mean, noxu1LatStats.stdDev, noxu1LatStats.cv) + t.Logf("noxu2 client request count %d duration mean %v stddev %v cv %v", noxu2LatMeasure.Count, noxu2LatStats.mean, noxu2LatStats.stdDev, noxu2LatStats.cv) + + // Calculate server-side observed concurrency + noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats) + noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats) + // Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side + noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean + noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean + t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency) + + // There are uncontrolled overheads that introduce noise into the system. The coefficient of variation (CV), that is, + // standard deviation divided by mean, for a class of traffic is a characterization of all the noise that applied to + // that class. We found that noxu1 generally had a much bigger CV than noxu2. This makes sense, because noxu1 probes + // more behavior --- the waiting in queues. So we take the minimum of the two as an indicator of the relative amount + // of noise that comes from all the other behavior. Currently, we use 2 times the experienced coefficient of variation + // as the margin of error. + margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv) + t.Logf("Error margin is %v", margin) + + isConcurrencyExpected := func(name string, observed float64, expected float64) bool { + relativeErr := math.Abs(expected-observed) / expected + t.Logf("%v relative error is %v", name, relativeErr) + return relativeErr <= margin + } + if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) { + t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency) + } + if !isConcurrencyExpected(plNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) { + t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency) + } + + // Check the server-side APF seat utilization measurements + if math.Abs(1-noxu1Avg.seatUtil) > 0.05 { + t.Errorf("noxu1Avg.seatUtil=%v is too far from expected=1.0", noxu1Avg.seatUtil) + } + noxu2ExpectedSeatUtil := float64(noxu2NumGoroutines) / float64(noxu2T0.availableSeats) + if math.Abs(noxu2ExpectedSeatUtil-noxu2Avg.seatUtil) > 0.05 { + t.Errorf("noxu2Avg.seatUtil=%v is too far from expected=%v", noxu2Avg.seatUtil, noxu2ExpectedSeatUtil) + } +} + +func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) { + + resp, err := getMetrics(c) + if err != nil { + return nil, err + } + + dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) + decoder := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{}, + } + + snapshot := metricSnapshot{} + + for { + var v model.Vector + if err := decoder.Decode(&v); err != nil { + if err == io.EOF { + // Expected loop termination condition. + return snapshot, nil + } + return nil, fmt.Errorf("failed decoding metrics: %v", err) + } + for _, metric := range v { + plLabel := string(metric.Metric[labelPriorityLevel]) + entry := plMetrics{} + if v, ok := snapshot[plLabel]; ok { + entry = v + } + switch name := string(metric.Metric[model.MetricNameLabel]); name { + case requestExecutionSecondsSumName: + entry.execSeconds.Sum = float64(metric.Value) + case requestExecutionSecondsCountName: + entry.execSeconds.Count = int(metric.Value) + case priorityLevelSeatUtilSumName: + entry.seatUtil.Sum = float64(metric.Value) + case priorityLevelSeatUtilCountName: + entry.seatUtil.Count = int(metric.Value) + case requestConcurrencyLimitMetricsName: + entry.availableSeats = int(metric.Value) + } + snapshot[plLabel] = entry + } + } +} + +// Webhook authorizer code copied from staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go with minor changes +// V1Service mocks a remote service. +type V1Service interface { + Review(*authorizationv1.SubjectAccessReview) + HTTPStatusCode() int +} + +// NewV1TestServer wraps a V1Service as an httptest.Server. +func NewV1TestServer(s V1Service, cert, key, caCert []byte) (*httptest.Server, error) { + const webhookPath = "/testserver" + var tlsConfig *tls.Config + if cert != nil { + cert, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + tlsConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } + + if caCert != nil { + rootCAs := x509.NewCertPool() + rootCAs.AppendCertsFromPEM(caCert) + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } + tlsConfig.ClientCAs = rootCAs + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + + serveHTTP := func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, fmt.Sprintf("unexpected method: %v", r.Method), http.StatusMethodNotAllowed) + return + } + if r.URL.Path != webhookPath { + http.Error(w, fmt.Sprintf("unexpected path: %v", r.URL.Path), http.StatusNotFound) + return + } + + var review authorizationv1.SubjectAccessReview + bodyData, _ := ioutil.ReadAll(r.Body) + if err := json.Unmarshal(bodyData, &review); err != nil { + http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest) + return + } + + // ensure we received the serialized review as expected + if review.APIVersion != "authorization.k8s.io/v1" { + http.Error(w, fmt.Sprintf("wrong api version: %s", string(bodyData)), http.StatusBadRequest) + return + } + // once we have a successful request, always call the review to record that we were called + s.Review(&review) + if s.HTTPStatusCode() < 200 || s.HTTPStatusCode() >= 300 { + http.Error(w, "HTTP Error", s.HTTPStatusCode()) + return + } + type status struct { + Allowed bool `json:"allowed"` + Reason string `json:"reason"` + EvaluationError string `json:"evaluationError"` + } + resp := struct { + APIVersion string `json:"apiVersion"` + Status status `json:"status"` + }{ + APIVersion: authorizationv1.SchemeGroupVersion.String(), + Status: status{review.Status.Allowed, review.Status.Reason, review.Status.EvaluationError}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + } + + server := httptest.NewUnstartedServer(http.HandlerFunc(serveHTTP)) + server.TLS = tlsConfig + server.StartTLS() + + // Adjust the path to point to our custom path + serverURL, _ := url.Parse(server.URL) + serverURL.Path = webhookPath + server.URL = serverURL.String() + + return server, nil +} + +// A service that can be set to allow all or deny all authorization requests. +type mockV1Service struct { + allow bool + statusCode int +} + +func (m *mockV1Service) Review(r *authorizationv1.SubjectAccessReview) { + if r.Spec.User == "noxu1" || r.Spec.User == "noxu2" { + time.Sleep(fakeworkDuration) // simulate fake work with sleep + } + r.Status.Allowed = m.allow +} +func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } + +// newV1Authorizer creates a temporary kubeconfig file from the provided arguments and attempts to load +// a new WebhookAuthorizer from it. +func newV1Authorizer(callbackURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration) (*webhook.WebhookAuthorizer, error) { + tempfile, err := ioutil.TempFile("", "") + if err != nil { + return nil, err + } + p := tempfile.Name() + defer os.Remove(p) + config := v1.Config{ + Clusters: []v1.NamedCluster{ + { + Cluster: v1.Cluster{Server: callbackURL, CertificateAuthorityData: ca}, + }, + }, + AuthInfos: []v1.NamedAuthInfo{ + { + AuthInfo: v1.AuthInfo{ClientCertificateData: clientCert, ClientKeyData: clientKey}, + }, + }, + } + if err := json.NewEncoder(tempfile).Encode(config); err != nil { + return nil, err + } + clientConfig, err := webhookutil.LoadKubeconfig(p, nil) + if err != nil { + return nil, err + } + + return webhook.New(clientConfig, "v1", cacheTime, cacheTime, testRetryBackoff) +} + +var testRetryBackoff = wait.Backoff{ + Duration: 5 * time.Millisecond, + Factor: 1.5, + Jitter: 0.2, + Steps: 5, +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1b99b6847ed..f8f4d979023 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1555,6 +1555,7 @@ k8s.io/apiserver/pkg/admission/plugin/webhook/namespace k8s.io/apiserver/pkg/admission/plugin/webhook/object k8s.io/apiserver/pkg/admission/plugin/webhook/request k8s.io/apiserver/pkg/admission/plugin/webhook/rules +k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts k8s.io/apiserver/pkg/admission/plugin/webhook/validating k8s.io/apiserver/pkg/admission/testing k8s.io/apiserver/pkg/apis/apiserver