diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go index ad02cd3df6a..b7fe4cb73cd 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "math/rand" "testing" "time" @@ -35,21 +36,24 @@ func TestSimpleCache(t *testing.T) { // Note: the performance profile of this benchmark may not match that in the production. // When making change to SimpleCache, run test with and without concurrency to better understand the impact. // This is a tool to test and measure high concurrency of the cache in isolation and not to the Kubernetes usage of the Cache. -func BenchmarkSimpleCache(b *testing.B) { - benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b) +func BenchmarkCacheContentions(b *testing.B) { + for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { + b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { + benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys) + }) + b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys) + }) + } } func TestStripedCache(t *testing.T) { testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) } -func BenchmarkStripedCache(b *testing.B) { - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b) -} - -func benchmarkCache(cache cache, b *testing.B) { +func benchmarkCache(cache cache, b *testing.B, numKeys int) { keys := []string{} - for i := 0; i < b.N; i++ { + for i := 0; i < numKeys; i++ { key := uuid.New().String() keys = append(keys, key) } @@ -59,7 +63,7 @@ func benchmarkCache(cache cache, b *testing.B) { b.SetParallelism(500) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - key := keys[rand.Intn(b.N)] + key := keys[rand.Intn(numKeys)] _, ok := cache.get(key) if ok { cache.remove(key) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index 736d477d4cf..f6ce6a3e2d2 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -21,6 +21,7 @@ import ( "crypto/hmac" "crypto/rand" "crypto/sha256" + "errors" "fmt" mathrand "math/rand" "reflect" @@ -173,14 +174,20 @@ func BenchmarkKeyFunc(b *testing.B) { func BenchmarkCachedTokenAuthenticator(b *testing.B) { tokenCount := []int{100, 500, 2500, 12500, 62500} - for _, tc := range tokenCount { - b.Run(fmt.Sprintf("toks-%v", tc), newSingleBenchmark(tc).bench) + threadCount := []int{1, 16, 256} + for _, tokens := range tokenCount { + for _, threads := range threadCount { + newSingleBenchmark(tokens, threads).run(b) + } } } -func newSingleBenchmark(tokenCount int) *singleBenchmark { - s := &singleBenchmark{} - s.makeTokens(tokenCount) +func newSingleBenchmark(tokens, threads int) *singleBenchmark { + s := &singleBenchmark{ + threadCount: threads, + tokenCount: tokens, + } + s.makeTokens() return s } @@ -188,6 +195,7 @@ func newSingleBenchmark(tokenCount int) *singleBenchmark { // question this benchmark answers is, "what's the average latency added by the // cache for N concurrent tokens?" type singleBenchmark struct { + threadCount int // These token.* variables are set by makeTokens() tokenCount int // pre-computed response for a token @@ -200,12 +208,10 @@ type singleBenchmark struct { // Simulate slowness, qps limit, external service limitation, etc chokepoint chan struct{} - b *testing.B - wg sync.WaitGroup + b *testing.B } -func (s *singleBenchmark) makeTokens(count int) { - s.tokenCount = count +func (s *singleBenchmark) makeTokens() { s.tokenToResponse = map[string]*cacheRecord{} s.tokenToAuds = map[string]authenticator.Audiences{} s.tokens = []string{} @@ -232,7 +238,7 @@ func (s *singleBenchmark) makeTokens(count int) { r.err = nil default: r.ok = false - r.err = fmt.Errorf("I can't think of a clever error name right now") + r.err = errors.New("I can't think of a clever error name right now") } s.tokens = append(s.tokens, tok) s.tokenToResponse[tok] = &r @@ -243,8 +249,8 @@ func (s *singleBenchmark) makeTokens(count int) { } func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) { - <-s.chokepoint - defer func() { s.chokepoint <- struct{}{} }() + s.chokepoint <- struct{}{} + defer func() { <-s.chokepoint }() time.Sleep(1 * time.Millisecond) r, ok := s.tokenToResponse[token] if !ok { @@ -253,41 +259,6 @@ func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authentica return r.resp, r.ok, r.err } -// To prevent contention over a channel, and to minimize the case where some -// goroutines finish before others, vary the number of goroutines and batch -// size based on the benchmark size. -func (s *singleBenchmark) queueBatches() (<-chan int, int) { - batchSize := 1 - threads := 1 - - switch { - case s.b.N < 5000: - threads = s.b.N - batchSize = 1 - default: - threads = 5000 - batchSize = s.b.N / (threads * 10) - if batchSize < 1 { - batchSize = 1 - } - } - - batches := make(chan int, threads*2) - s.wg.Add(1) - go func() { - defer s.wg.Done() - defer close(batches) - remaining := s.b.N - for remaining > batchSize { - batches <- batchSize - remaining -= batchSize - } - batches <- remaining - }() - - return batches, threads -} - func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { tok := s.tokens[n] auds := s.tokenToAuds[tok] @@ -296,6 +267,10 @@ func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { a.AuthenticateToken(ctx, tok) } +func (s *singleBenchmark) run(b *testing.B) { + b.Run(fmt.Sprintf("tokens=%d threads=%d", s.tokenCount, s.threadCount), s.bench) +} + func (s *singleBenchmark) bench(b *testing.B) { s.b = b a := newWithClock( @@ -307,31 +282,19 @@ func (s *singleBenchmark) bench(b *testing.B) { ) const maxInFlight = 40 s.chokepoint = make(chan struct{}, maxInFlight) - for i := 0; i < maxInFlight; i++ { - s.chokepoint <- struct{}{} - } - batches, threadCount := s.queueBatches() s.b.ResetTimer() - for i := 0; i < threadCount; i++ { - s.wg.Add(1) - go func() { - defer s.wg.Done() - // don't contend over the lock for the global rand.Rand - r := mathrand.New(mathrand.NewSource(mathrand.Int63())) - for count := range batches { - for i := 0; i < count; i++ { - // some problems appear with random - // access, some appear with many - // requests for a single entry, so we - // do both. - s.doAuthForTokenN(r.Intn(len(s.tokens)), a) - s.doAuthForTokenN(0, a) - } - } - }() - } - - s.wg.Wait() + b.SetParallelism(s.threadCount) + b.RunParallel(func(pb *testing.PB) { + r := mathrand.New(mathrand.NewSource(mathrand.Int63())) + for pb.Next() { + // some problems appear with random + // access, some appear with many + // requests for a single entry, so we + // do both. + s.doAuthForTokenN(r.Intn(len(s.tokens)), a) + s.doAuthForTokenN(0, a) + } + }) }