Merge pull request #84423 from mikedanese/tokbench

adjust token cache benchmarks to get more accurate behavior
This commit is contained in:
Kubernetes Prow Robot 2019-11-12 04:48:07 -08:00 committed by GitHub
commit 0708eb5903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 80 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"fmt"
"math/rand" "math/rand"
"testing" "testing"
"time" "time"
@ -35,21 +36,24 @@ func TestSimpleCache(t *testing.T) {
// Note: the performance profile of this benchmark may not match that in the production. // Note: the performance profile of this benchmark may not match that in the production.
// When making change to SimpleCache, run test with and without concurrency to better understand the impact. // When making change to SimpleCache, run test with and without concurrency to better understand the impact.
// This is a tool to test and measure high concurrency of the cache in isolation and not to the Kubernetes usage of the Cache. // This is a tool to test and measure high concurrency of the cache in isolation and not to the Kubernetes usage of the Cache.
func BenchmarkSimpleCache(b *testing.B) { func BenchmarkCacheContentions(b *testing.B) {
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b) for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} {
b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) {
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys)
})
b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) {
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys)
})
}
} }
func TestStripedCache(t *testing.T) { func TestStripedCache(t *testing.T) {
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t)
} }
func BenchmarkStripedCache(b *testing.B) { func benchmarkCache(cache cache, b *testing.B, numKeys int) {
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b)
}
func benchmarkCache(cache cache, b *testing.B) {
keys := []string{} keys := []string{}
for i := 0; i < b.N; i++ { for i := 0; i < numKeys; i++ {
key := uuid.New().String() key := uuid.New().String()
keys = append(keys, key) keys = append(keys, key)
} }
@ -59,7 +63,7 @@ func benchmarkCache(cache cache, b *testing.B) {
b.SetParallelism(500) b.SetParallelism(500)
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
key := keys[rand.Intn(b.N)] key := keys[rand.Intn(numKeys)]
_, ok := cache.get(key) _, ok := cache.get(key)
if ok { if ok {
cache.remove(key) cache.remove(key)

View File

@ -21,6 +21,7 @@ import (
"crypto/hmac" "crypto/hmac"
"crypto/rand" "crypto/rand"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
mathrand "math/rand" mathrand "math/rand"
"reflect" "reflect"
@ -173,14 +174,20 @@ func BenchmarkKeyFunc(b *testing.B) {
func BenchmarkCachedTokenAuthenticator(b *testing.B) { func BenchmarkCachedTokenAuthenticator(b *testing.B) {
tokenCount := []int{100, 500, 2500, 12500, 62500} tokenCount := []int{100, 500, 2500, 12500, 62500}
for _, tc := range tokenCount { threadCount := []int{1, 16, 256}
b.Run(fmt.Sprintf("toks-%v", tc), newSingleBenchmark(tc).bench) for _, tokens := range tokenCount {
for _, threads := range threadCount {
newSingleBenchmark(tokens, threads).run(b)
}
} }
} }
func newSingleBenchmark(tokenCount int) *singleBenchmark { func newSingleBenchmark(tokens, threads int) *singleBenchmark {
s := &singleBenchmark{} s := &singleBenchmark{
s.makeTokens(tokenCount) threadCount: threads,
tokenCount: tokens,
}
s.makeTokens()
return s return s
} }
@ -188,6 +195,7 @@ func newSingleBenchmark(tokenCount int) *singleBenchmark {
// question this benchmark answers is, "what's the average latency added by the // question this benchmark answers is, "what's the average latency added by the
// cache for N concurrent tokens?" // cache for N concurrent tokens?"
type singleBenchmark struct { type singleBenchmark struct {
threadCount int
// These token.* variables are set by makeTokens() // These token.* variables are set by makeTokens()
tokenCount int tokenCount int
// pre-computed response for a token // pre-computed response for a token
@ -201,11 +209,9 @@ type singleBenchmark struct {
chokepoint chan struct{} chokepoint chan struct{}
b *testing.B b *testing.B
wg sync.WaitGroup
} }
func (s *singleBenchmark) makeTokens(count int) { func (s *singleBenchmark) makeTokens() {
s.tokenCount = count
s.tokenToResponse = map[string]*cacheRecord{} s.tokenToResponse = map[string]*cacheRecord{}
s.tokenToAuds = map[string]authenticator.Audiences{} s.tokenToAuds = map[string]authenticator.Audiences{}
s.tokens = []string{} s.tokens = []string{}
@ -232,7 +238,7 @@ func (s *singleBenchmark) makeTokens(count int) {
r.err = nil r.err = nil
default: default:
r.ok = false r.ok = false
r.err = fmt.Errorf("I can't think of a clever error name right now") r.err = errors.New("I can't think of a clever error name right now")
} }
s.tokens = append(s.tokens, tok) s.tokens = append(s.tokens, tok)
s.tokenToResponse[tok] = &r s.tokenToResponse[tok] = &r
@ -243,8 +249,8 @@ func (s *singleBenchmark) makeTokens(count int) {
} }
func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) { func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) {
<-s.chokepoint s.chokepoint <- struct{}{}
defer func() { s.chokepoint <- struct{}{} }() defer func() { <-s.chokepoint }()
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
r, ok := s.tokenToResponse[token] r, ok := s.tokenToResponse[token]
if !ok { if !ok {
@ -253,41 +259,6 @@ func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authentica
return r.resp, r.ok, r.err return r.resp, r.ok, r.err
} }
// To prevent contention over a channel, and to minimize the case where some
// goroutines finish before others, vary the number of goroutines and batch
// size based on the benchmark size.
func (s *singleBenchmark) queueBatches() (<-chan int, int) {
batchSize := 1
threads := 1
switch {
case s.b.N < 5000:
threads = s.b.N
batchSize = 1
default:
threads = 5000
batchSize = s.b.N / (threads * 10)
if batchSize < 1 {
batchSize = 1
}
}
batches := make(chan int, threads*2)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer close(batches)
remaining := s.b.N
for remaining > batchSize {
batches <- batchSize
remaining -= batchSize
}
batches <- remaining
}()
return batches, threads
}
func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) {
tok := s.tokens[n] tok := s.tokens[n]
auds := s.tokenToAuds[tok] auds := s.tokenToAuds[tok]
@ -296,6 +267,10 @@ func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) {
a.AuthenticateToken(ctx, tok) a.AuthenticateToken(ctx, tok)
} }
func (s *singleBenchmark) run(b *testing.B) {
b.Run(fmt.Sprintf("tokens=%d threads=%d", s.tokenCount, s.threadCount), s.bench)
}
func (s *singleBenchmark) bench(b *testing.B) { func (s *singleBenchmark) bench(b *testing.B) {
s.b = b s.b = b
a := newWithClock( a := newWithClock(
@ -307,21 +282,13 @@ func (s *singleBenchmark) bench(b *testing.B) {
) )
const maxInFlight = 40 const maxInFlight = 40
s.chokepoint = make(chan struct{}, maxInFlight) s.chokepoint = make(chan struct{}, maxInFlight)
for i := 0; i < maxInFlight; i++ {
s.chokepoint <- struct{}{}
}
batches, threadCount := s.queueBatches()
s.b.ResetTimer() s.b.ResetTimer()
for i := 0; i < threadCount; i++ { b.SetParallelism(s.threadCount)
s.wg.Add(1) b.RunParallel(func(pb *testing.PB) {
go func() {
defer s.wg.Done()
// don't contend over the lock for the global rand.Rand
r := mathrand.New(mathrand.NewSource(mathrand.Int63())) r := mathrand.New(mathrand.NewSource(mathrand.Int63()))
for count := range batches { for pb.Next() {
for i := 0; i < count; i++ {
// some problems appear with random // some problems appear with random
// access, some appear with many // access, some appear with many
// requests for a single entry, so we // requests for a single entry, so we
@ -329,9 +296,5 @@ func (s *singleBenchmark) bench(b *testing.B) {
s.doAuthForTokenN(r.Intn(len(s.tokens)), a) s.doAuthForTokenN(r.Intn(len(s.tokens)), a)
s.doAuthForTokenN(0, a) s.doAuthForTokenN(0, a)
} }
} })
}()
}
s.wg.Wait()
} }