From c70ec593ec95f464cf6a2fbf5a03fffdb4ef5acd Mon Sep 17 00:00:00 2001 From: Chih-Chieh Yang <7364402+cyang49@users.noreply.github.com> Date: Wed, 3 Aug 2022 14:14:36 +0000 Subject: [PATCH] Fix minor issues and clean up --- .../flowcontrol/concurrency_util_test.go | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index f64d838c260..f766aab976b 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -105,23 +105,26 @@ type clientLatencyMeasurement struct { func (clm *clientLatencyMeasurement) reset() { clm.Mu.Lock() + defer clm.Mu.Unlock() clm.Sum = 0 clm.Count = 0 clm.SumSq = 0 - clm.Mu.Unlock() } func (clm *clientLatencyMeasurement) update(duration float64) { clm.Mu.Lock() + defer clm.Mu.Unlock() clm.Count += 1 clm.Sum += duration clm.SumSq += duration * duration - clm.Mu.Unlock() } func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { + clm.Mu.Lock() + defer clm.Mu.Unlock() mean := clm.Sum / float64(clm.Count) ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean + stdDev := math.Sqrt(ss / float64(clm.Count)) cv := stdDev / mean return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv} @@ -129,7 +132,7 @@ func (clm *clientLatencyMeasurement) getStats() clientLatencyStats { type clientLatencyStats struct { mean float64 // latency average - stdDev float64 // latency standard deviation + stdDev float64 // latency population standard deviation cv float64 // latency coefficient of variation } @@ -138,25 +141,21 @@ type plMetricAvg struct { seatUtil float64 // average seat utilization } -func intervalMetricAvg(snapshots map[string]metricSnapshot, t0 string, t1 string, plLabel string) plMetricAvg { - plmT0 := snapshots[t0][plLabel] - plmT1 := snapshots[t1][plLabel] +func intervalMetricAvg(snapshot0 metricSnapshot, snapshot1 metricSnapshot, plLabel string) plMetricAvg { + plmT0 := snapshot0[plLabel] + plmT1 := snapshot1[plLabel] return plMetricAvg{ reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count), seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count), } } -// This integration test checks the client-side expected concurrency and the server-side observed concurrency -// to make sure that they are close within a small error bound and that the priority levels are isolated. -// This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead -// of concurrency. In order to mitigate the effects of system noise, authorization webhook is used to artificially -// increase request execution time to make the system noise relatively insignificant. -// This test calculates the server-side observed concurrency from average priority level seat utilization APF metric. -// It also assumes that -// (server-side request execution throughput) == (client-side request throughput) and derives a formula to -// calculate the client-side expected concurrency. The two are compared and a small error bound is determined -// from estimating the noise using 2*(standard deviation of requenst latency)/(avg request latency). +// TestConcurrencyIsolation tests the concurrency isolation between priority levels. +// The test defines two priority levels for this purpose, and corresponding flow schemas. +// To one priority level, this test sends many more concurrent requests than the configuration +// allows to execute at once, while sending fewer than allowed to the other priority level. +// The primary check is that the low flow gets all the seats it wants, but is modulated by +// recognizing that there are uncontrolled overheads in the system. func TestConcurrencyIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test @@ -229,24 +228,22 @@ func TestConcurrencyIsolation(t *testing.T) { noxu1LatMeasure.reset() noxu2LatMeasure.reset() - // Snapshots maps from a time label to a metricSnapshot - snapshots := make(map[string]metricSnapshot) - snapshots["t0"], err = getRequestMetricsSnapshot(loopbackClient) + snapshot0, err := getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } time.Sleep(testTime) // after warming up, the test enters a steady state - snapshots["t1"], err = getRequestMetricsSnapshot(loopbackClient) + snapshot1, err := getRequestMetricsSnapshot(loopbackClient) if err != nil { t.Error(err) } close(stopCh) // Check the assumptions of the test - noxu1T0 := snapshots["t0"][plNoxu1.Name] - noxu1T1 := snapshots["t1"][plNoxu1.Name] - noxu2T0 := snapshots["t0"][plNoxu2.Name] - noxu2T1 := snapshots["t1"][plNoxu2.Name] + noxu1T0 := snapshot0[plNoxu1.Name] + noxu1T1 := snapshot1[plNoxu1.Name] + noxu2T0 := snapshot0[plNoxu2.Name] + noxu2T1 := snapshot1[plNoxu2.Name] if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count { t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d", noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count) @@ -261,7 +258,7 @@ func TestConcurrencyIsolation(t *testing.T) { t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4", noxu1T0.availableSeats, noxu2T0.availableSeats) } - // No reuqests should be rejected under normal situations + // No requests should be rejected under normal situations _, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) if err != nil { t.Error(err) @@ -274,8 +271,8 @@ func TestConcurrencyIsolation(t *testing.T) { } // Calculate APF server side metric averages during the test interval - noxu1Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu1.Name) - noxu2Avg := intervalMetricAvg(snapshots, "t0", "t1", plNoxu2.Name) + noxu1Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu1.Name) + noxu2Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu2.Name) t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution) t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil) @@ -290,11 +287,16 @@ func TestConcurrencyIsolation(t *testing.T) { noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats) noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats) // Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side - // Expected concurrency computed can sometimes be larger than the number of available seats. We use the number of available seats as an upper bound noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency) - // Calculate the tolerable error margin and perform the final check + + // There are uncontrolled overheads that introduce noise into the system. The coefficient of variation (CV), that is, + // standard deviation divided by mean, for a class of traffic is a characterization of all the noise that applied to + // that class. We found that noxu1 generally had a much bigger CV than noxu2. This makes sense, because noxu1 probes + // more behavior --- the waiting in queues. So we take the minimum of the two as an indicator of the relative amount + // of noise that comes from all the other behavior. Currently, we use 2 times the experienced coefficient of variation + // as the margin of error. margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv) t.Logf("Error margin is %v", margin) @@ -309,14 +311,13 @@ func TestConcurrencyIsolation(t *testing.T) { t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency) } - // Check server-side APF measurements - // For the elephant noxu1, the avg seat utilization should be close to 1.0 + // Check the server-side APF seat utilization measurements if math.Abs(1-noxu1Avg.seatUtil) > 0.05 { - t.Errorf("noxu1PLSeatUtilAvg=%v is too far from expected=1.0", noxu1Avg.seatUtil) + t.Errorf("noxu1Avg.seatUtil=%v is too far from expected=1.0", noxu1Avg.seatUtil) } - // For the mouse noxu2, the observed concurrency should be close to the number of goroutines it uses - if math.Abs(1-noxu2ObservedConcurrency/float64(noxu2NumGoroutines)) > 0.05 { - t.Errorf("noxu2ObservedConcurrency=%v is too far from noxu2NumGoroutines=%d", noxu2ObservedConcurrency, noxu2NumGoroutines) + noxu2ExpectedSeatUtil := float64(noxu2NumGoroutines) / float64(noxu2T0.availableSeats) + if math.Abs(noxu2ExpectedSeatUtil-noxu2Avg.seatUtil) > 0.05 { + t.Errorf("noxu2Avg.seatUtil=%v is too far from expected=%v", noxu2Avg.seatUtil, noxu2ExpectedSeatUtil) } }