Merge pull request #96244 from yue9944882/apf-filter-test-failure

Fixes APF filter test failure due to racy read&write
This commit is contained in:
Kubernetes Prow Robot 2020-11-09 13:44:42 -08:00 committed by GitHub
commit d9d626f1e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,7 +18,6 @@ package filters
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
@ -178,7 +177,7 @@ func TestApfRejectRequest(t *testing.T) {
t.Error(err)
}
checkForExpectedMetricsWithRetry(t, []string{
checkForExpectedMetrics(t, []string{
"apiserver_request_terminations_total",
"apiserver_dropped_requests_total",
})
@ -203,7 +202,7 @@ func TestApfExemptRequest(t *testing.T) {
t.Error(err)
}
checkForExpectedMetricsWithRetry(t, []string{
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
"apiserver_flowcontrol_read_vs_write_request_count_samples",
@ -229,7 +228,7 @@ func TestApfExecuteRequest(t *testing.T) {
t.Error(err)
}
checkForExpectedMetricsWithRetry(t, []string{
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_current_inqueue_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
@ -246,8 +245,11 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
time.Sleep(time.Millisecond * 50)
concurrentRequests := 5
var preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute sync.WaitGroup
for _, wg := range []*sync.WaitGroup{&preStartExecute, &postStartExecute, &preEnqueue, &postEnqueue, &preDequeue, &postDequeue, &finishExecute} {
preStartExecute, postStartExecute := &sync.WaitGroup{}, &sync.WaitGroup{}
preEnqueue, postEnqueue := &sync.WaitGroup{}, &sync.WaitGroup{}
preDequeue, postDequeue := &sync.WaitGroup{}, &sync.WaitGroup{}
finishExecute := &sync.WaitGroup{}
for _, wg := range []*sync.WaitGroup{preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute} {
wg.Add(concurrentRequests)
}
@ -306,7 +308,7 @@ func TestApfExecuteMultipleRequests(t *testing.T) {
}
wg.Wait()
checkForExpectedMetricsWithRetry(t, []string{
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_current_inqueue_requests",
"apiserver_flowcontrol_read_vs_write_request_count_watermarks",
@ -324,49 +326,30 @@ func TestApfCancelWaitRequest(t *testing.T) {
t.Error(err)
}
checkForExpectedMetricsWithRetry(t, []string{
checkForExpectedMetrics(t, []string{
"apiserver_current_inflight_requests",
"apiserver_request_terminations_total",
"apiserver_dropped_requests_total",
})
}
// wait async metrics to be collected
func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) {
maxRetries := 5
var checkErrors []error
for i := 0; i < maxRetries; i++ {
t.Logf("Check for expected metrics with retry %d", i)
metricsFamily, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
t.Fatalf("Failed to gather metrics %v", err)
}
metrics := map[string]interface{}{}
for _, mf := range metricsFamily {
metrics[*mf.Name] = mf
}
checkErrors = checkForExpectedMetrics(expectedMetrics, metrics)
if checkErrors == nil {
return
}
time.Sleep(1 * time.Second)
// gathers and checks the metrics.
func checkForExpectedMetrics(t *testing.T, expectedMetrics []string) {
metricsFamily, err := legacyregistry.DefaultGatherer.Gather()
if err != nil {
t.Fatalf("Failed to gather metrics %v", err)
}
metrics := map[string]interface{}{}
for _, mf := range metricsFamily {
metrics[*mf.Name] = mf
}
for _, checkError := range checkErrors {
t.Error(checkError)
}
}
func checkForExpectedMetrics(expectedMetrics []string, metrics map[string]interface{}) []error {
var errs []error
for _, metricName := range expectedMetrics {
if _, ok := metrics[metricName]; !ok {
if !ok {
errs = append(errs, errors.New("Scraped metrics did not include expected metric "+metricName))
t.Errorf("Scraped metrics did not include expected metric %s", metricName)
}
}
}
return errs
}