mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
adjust token cache benchmarks to get more accurate behavior
b.N is adjusted by pkg/testing using an internal heuristic: > The benchmark function must run the target code b.N times. During > benchmark execution, b.N is adjusted until the benchmark function > lasts long enough to be timed reliably. Using b.N to seed other parameters makes the benchmark behavior difficult to reason about. Before this change, thread count in the CachedTokenAuthenticator benchmark is always 5000, and batch size is almost always 1 when I run this locally. SimpleCache and StripedCache benchmarks had similarly strange scaling. After modifying CachedTokenAuthenticator to only adjust iterations based on b.N, the batch chan was an point of contention and I wasn't able to see any significant CPU consumption. This was fixed by using ParallelBench to do the batching, rather than using a chan.
This commit is contained in:
parent
ed3cc6afea
commit
43d34882c9
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
@ -35,21 +36,24 @@ func TestSimpleCache(t *testing.T) {
|
||||
// Note: the performance profile of this benchmark may not match that in the production.
|
||||
// When making change to SimpleCache, run test with and without concurrency to better understand the impact.
|
||||
// This is a tool to test and measure high concurrency of the cache in isolation and not to the Kubernetes usage of the Cache.
|
||||
func BenchmarkSimpleCache(b *testing.B) {
|
||||
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b)
|
||||
func BenchmarkCacheContentions(b *testing.B) {
|
||||
for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} {
|
||||
b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys)
|
||||
})
|
||||
b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStripedCache(t *testing.T) {
|
||||
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t)
|
||||
}
|
||||
|
||||
func BenchmarkStripedCache(b *testing.B) {
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b)
|
||||
}
|
||||
|
||||
func benchmarkCache(cache cache, b *testing.B) {
|
||||
func benchmarkCache(cache cache, b *testing.B, numKeys int) {
|
||||
keys := []string{}
|
||||
for i := 0; i < b.N; i++ {
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := uuid.New().String()
|
||||
keys = append(keys, key)
|
||||
}
|
||||
@ -59,7 +63,7 @@ func benchmarkCache(cache cache, b *testing.B) {
|
||||
b.SetParallelism(500)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
key := keys[rand.Intn(b.N)]
|
||||
key := keys[rand.Intn(numKeys)]
|
||||
_, ok := cache.get(key)
|
||||
if ok {
|
||||
cache.remove(key)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"crypto/hmac"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
mathrand "math/rand"
|
||||
"reflect"
|
||||
@ -173,14 +174,20 @@ func BenchmarkKeyFunc(b *testing.B) {
|
||||
|
||||
func BenchmarkCachedTokenAuthenticator(b *testing.B) {
|
||||
tokenCount := []int{100, 500, 2500, 12500, 62500}
|
||||
for _, tc := range tokenCount {
|
||||
b.Run(fmt.Sprintf("toks-%v", tc), newSingleBenchmark(tc).bench)
|
||||
threadCount := []int{1, 16, 256}
|
||||
for _, tokens := range tokenCount {
|
||||
for _, threads := range threadCount {
|
||||
newSingleBenchmark(tokens, threads).run(b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newSingleBenchmark(tokenCount int) *singleBenchmark {
|
||||
s := &singleBenchmark{}
|
||||
s.makeTokens(tokenCount)
|
||||
func newSingleBenchmark(tokens, threads int) *singleBenchmark {
|
||||
s := &singleBenchmark{
|
||||
threadCount: threads,
|
||||
tokenCount: tokens,
|
||||
}
|
||||
s.makeTokens()
|
||||
return s
|
||||
}
|
||||
|
||||
@ -188,6 +195,7 @@ func newSingleBenchmark(tokenCount int) *singleBenchmark {
|
||||
// question this benchmark answers is, "what's the average latency added by the
|
||||
// cache for N concurrent tokens?"
|
||||
type singleBenchmark struct {
|
||||
threadCount int
|
||||
// These token.* variables are set by makeTokens()
|
||||
tokenCount int
|
||||
// pre-computed response for a token
|
||||
@ -200,12 +208,10 @@ type singleBenchmark struct {
|
||||
// Simulate slowness, qps limit, external service limitation, etc
|
||||
chokepoint chan struct{}
|
||||
|
||||
b *testing.B
|
||||
wg sync.WaitGroup
|
||||
b *testing.B
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) makeTokens(count int) {
|
||||
s.tokenCount = count
|
||||
func (s *singleBenchmark) makeTokens() {
|
||||
s.tokenToResponse = map[string]*cacheRecord{}
|
||||
s.tokenToAuds = map[string]authenticator.Audiences{}
|
||||
s.tokens = []string{}
|
||||
@ -232,7 +238,7 @@ func (s *singleBenchmark) makeTokens(count int) {
|
||||
r.err = nil
|
||||
default:
|
||||
r.ok = false
|
||||
r.err = fmt.Errorf("I can't think of a clever error name right now")
|
||||
r.err = errors.New("I can't think of a clever error name right now")
|
||||
}
|
||||
s.tokens = append(s.tokens, tok)
|
||||
s.tokenToResponse[tok] = &r
|
||||
@ -243,8 +249,8 @@ func (s *singleBenchmark) makeTokens(count int) {
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) {
|
||||
<-s.chokepoint
|
||||
defer func() { s.chokepoint <- struct{}{} }()
|
||||
s.chokepoint <- struct{}{}
|
||||
defer func() { <-s.chokepoint }()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
r, ok := s.tokenToResponse[token]
|
||||
if !ok {
|
||||
@ -253,41 +259,6 @@ func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authentica
|
||||
return r.resp, r.ok, r.err
|
||||
}
|
||||
|
||||
// To prevent contention over a channel, and to minimize the case where some
|
||||
// goroutines finish before others, vary the number of goroutines and batch
|
||||
// size based on the benchmark size.
|
||||
func (s *singleBenchmark) queueBatches() (<-chan int, int) {
|
||||
batchSize := 1
|
||||
threads := 1
|
||||
|
||||
switch {
|
||||
case s.b.N < 5000:
|
||||
threads = s.b.N
|
||||
batchSize = 1
|
||||
default:
|
||||
threads = 5000
|
||||
batchSize = s.b.N / (threads * 10)
|
||||
if batchSize < 1 {
|
||||
batchSize = 1
|
||||
}
|
||||
}
|
||||
|
||||
batches := make(chan int, threads*2)
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
defer close(batches)
|
||||
remaining := s.b.N
|
||||
for remaining > batchSize {
|
||||
batches <- batchSize
|
||||
remaining -= batchSize
|
||||
}
|
||||
batches <- remaining
|
||||
}()
|
||||
|
||||
return batches, threads
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) {
|
||||
tok := s.tokens[n]
|
||||
auds := s.tokenToAuds[tok]
|
||||
@ -296,6 +267,10 @@ func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) {
|
||||
a.AuthenticateToken(ctx, tok)
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) run(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("tokens=%d threads=%d", s.tokenCount, s.threadCount), s.bench)
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) bench(b *testing.B) {
|
||||
s.b = b
|
||||
a := newWithClock(
|
||||
@ -307,31 +282,19 @@ func (s *singleBenchmark) bench(b *testing.B) {
|
||||
)
|
||||
const maxInFlight = 40
|
||||
s.chokepoint = make(chan struct{}, maxInFlight)
|
||||
for i := 0; i < maxInFlight; i++ {
|
||||
s.chokepoint <- struct{}{}
|
||||
}
|
||||
|
||||
batches, threadCount := s.queueBatches()
|
||||
s.b.ResetTimer()
|
||||
|
||||
for i := 0; i < threadCount; i++ {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
// don't contend over the lock for the global rand.Rand
|
||||
r := mathrand.New(mathrand.NewSource(mathrand.Int63()))
|
||||
for count := range batches {
|
||||
for i := 0; i < count; i++ {
|
||||
// some problems appear with random
|
||||
// access, some appear with many
|
||||
// requests for a single entry, so we
|
||||
// do both.
|
||||
s.doAuthForTokenN(r.Intn(len(s.tokens)), a)
|
||||
s.doAuthForTokenN(0, a)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.wg.Wait()
|
||||
b.SetParallelism(s.threadCount)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
r := mathrand.New(mathrand.NewSource(mathrand.Int63()))
|
||||
for pb.Next() {
|
||||
// some problems appear with random
|
||||
// access, some appear with many
|
||||
// requests for a single entry, so we
|
||||
// do both.
|
||||
s.doAuthForTokenN(r.Intn(len(s.tokens)), a)
|
||||
s.doAuthForTokenN(0, a)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user