From 43d34882c9b3612d933b97b6e470fd8d36fe492b Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Sat, 26 Oct 2019 14:12:41 -0700 Subject: [PATCH] adjust token cache benchmarks to get more accurate behavior b.N is adjusted by pkg/testing using an internal heuristic: > The benchmark function must run the target code b.N times. During > benchmark execution, b.N is adjusted until the benchmark function > lasts long enough to be timed reliably. Using b.N to seed other parameters makes the benchmark behavior difficult to reason about. Before this change, thread count in the CachedTokenAuthenticator benchmark is always 5000, and batch size is almost always 1 when I run this locally. SimpleCache and StripedCache benchmarks had similarly strange scaling. After modifying CachedTokenAuthenticator to only adjust iterations based on b.N, the batch chan was an point of contention and I wasn't able to see any significant CPU consumption. This was fixed by using ParallelBench to do the batching, rather than using a chan. --- .../authentication/token/cache/cache_test.go | 22 ++-- .../cache/cached_token_authenticator_test.go | 105 ++++++------------ 2 files changed, 47 insertions(+), 80 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go index ad02cd3df6a..b7fe4cb73cd 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "math/rand" "testing" "time" @@ -35,21 +36,24 @@ func TestSimpleCache(t *testing.T) { // Note: the performance profile of this benchmark may not match that in the production. // When making change to SimpleCache, run test with and without concurrency to better understand the impact. // This is a tool to test and measure high concurrency of the cache in isolation and not to the Kubernetes usage of the Cache. -func BenchmarkSimpleCache(b *testing.B) { - benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b) +func BenchmarkCacheContentions(b *testing.B) { + for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { + b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { + benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys) + }) + b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys) + }) + } } func TestStripedCache(t *testing.T) { testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) } -func BenchmarkStripedCache(b *testing.B) { - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b) -} - -func benchmarkCache(cache cache, b *testing.B) { +func benchmarkCache(cache cache, b *testing.B, numKeys int) { keys := []string{} - for i := 0; i < b.N; i++ { + for i := 0; i < numKeys; i++ { key := uuid.New().String() keys = append(keys, key) } @@ -59,7 +63,7 @@ func benchmarkCache(cache cache, b *testing.B) { b.SetParallelism(500) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - key := keys[rand.Intn(b.N)] + key := keys[rand.Intn(numKeys)] _, ok := cache.get(key) if ok { cache.remove(key) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index 736d477d4cf..f6ce6a3e2d2 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -21,6 +21,7 @@ import ( "crypto/hmac" "crypto/rand" "crypto/sha256" + "errors" "fmt" mathrand "math/rand" "reflect" @@ -173,14 +174,20 @@ func BenchmarkKeyFunc(b *testing.B) { func BenchmarkCachedTokenAuthenticator(b *testing.B) { tokenCount := []int{100, 500, 2500, 12500, 62500} - for _, tc := range tokenCount { - b.Run(fmt.Sprintf("toks-%v", tc), newSingleBenchmark(tc).bench) + threadCount := []int{1, 16, 256} + for _, tokens := range tokenCount { + for _, threads := range threadCount { + newSingleBenchmark(tokens, threads).run(b) + } } } -func newSingleBenchmark(tokenCount int) *singleBenchmark { - s := &singleBenchmark{} - s.makeTokens(tokenCount) +func newSingleBenchmark(tokens, threads int) *singleBenchmark { + s := &singleBenchmark{ + threadCount: threads, + tokenCount: tokens, + } + s.makeTokens() return s } @@ -188,6 +195,7 @@ func newSingleBenchmark(tokenCount int) *singleBenchmark { // question this benchmark answers is, "what's the average latency added by the // cache for N concurrent tokens?" type singleBenchmark struct { + threadCount int // These token.* variables are set by makeTokens() tokenCount int // pre-computed response for a token @@ -200,12 +208,10 @@ type singleBenchmark struct { // Simulate slowness, qps limit, external service limitation, etc chokepoint chan struct{} - b *testing.B - wg sync.WaitGroup + b *testing.B } -func (s *singleBenchmark) makeTokens(count int) { - s.tokenCount = count +func (s *singleBenchmark) makeTokens() { s.tokenToResponse = map[string]*cacheRecord{} s.tokenToAuds = map[string]authenticator.Audiences{} s.tokens = []string{} @@ -232,7 +238,7 @@ func (s *singleBenchmark) makeTokens(count int) { r.err = nil default: r.ok = false - r.err = fmt.Errorf("I can't think of a clever error name right now") + r.err = errors.New("I can't think of a clever error name right now") } s.tokens = append(s.tokens, tok) s.tokenToResponse[tok] = &r @@ -243,8 +249,8 @@ func (s *singleBenchmark) makeTokens(count int) { } func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) { - <-s.chokepoint - defer func() { s.chokepoint <- struct{}{} }() + s.chokepoint <- struct{}{} + defer func() { <-s.chokepoint }() time.Sleep(1 * time.Millisecond) r, ok := s.tokenToResponse[token] if !ok { @@ -253,41 +259,6 @@ func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authentica return r.resp, r.ok, r.err } -// To prevent contention over a channel, and to minimize the case where some -// goroutines finish before others, vary the number of goroutines and batch -// size based on the benchmark size. -func (s *singleBenchmark) queueBatches() (<-chan int, int) { - batchSize := 1 - threads := 1 - - switch { - case s.b.N < 5000: - threads = s.b.N - batchSize = 1 - default: - threads = 5000 - batchSize = s.b.N / (threads * 10) - if batchSize < 1 { - batchSize = 1 - } - } - - batches := make(chan int, threads*2) - s.wg.Add(1) - go func() { - defer s.wg.Done() - defer close(batches) - remaining := s.b.N - for remaining > batchSize { - batches <- batchSize - remaining -= batchSize - } - batches <- remaining - }() - - return batches, threads -} - func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { tok := s.tokens[n] auds := s.tokenToAuds[tok] @@ -296,6 +267,10 @@ func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { a.AuthenticateToken(ctx, tok) } +func (s *singleBenchmark) run(b *testing.B) { + b.Run(fmt.Sprintf("tokens=%d threads=%d", s.tokenCount, s.threadCount), s.bench) +} + func (s *singleBenchmark) bench(b *testing.B) { s.b = b a := newWithClock( @@ -307,31 +282,19 @@ func (s *singleBenchmark) bench(b *testing.B) { ) const maxInFlight = 40 s.chokepoint = make(chan struct{}, maxInFlight) - for i := 0; i < maxInFlight; i++ { - s.chokepoint <- struct{}{} - } - batches, threadCount := s.queueBatches() s.b.ResetTimer() - for i := 0; i < threadCount; i++ { - s.wg.Add(1) - go func() { - defer s.wg.Done() - // don't contend over the lock for the global rand.Rand - r := mathrand.New(mathrand.NewSource(mathrand.Int63())) - for count := range batches { - for i := 0; i < count; i++ { - // some problems appear with random - // access, some appear with many - // requests for a single entry, so we - // do both. - s.doAuthForTokenN(r.Intn(len(s.tokens)), a) - s.doAuthForTokenN(0, a) - } - } - }() - } - - s.wg.Wait() + b.SetParallelism(s.threadCount) + b.RunParallel(func(pb *testing.PB) { + r := mathrand.New(mathrand.NewSource(mathrand.Int63())) + for pb.Next() { + // some problems appear with random + // access, some appear with many + // requests for a single entry, so we + // do both. + s.doAuthForTokenN(r.Intn(len(s.tokens)), a) + s.doAuthForTokenN(0, a) + } + }) }