diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD index 66c09d44844..d4f0a008840 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD @@ -15,6 +15,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/github.com/pborman/uuid:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index 291fb4e3212..736d477d4cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -21,12 +21,15 @@ import ( "crypto/hmac" "crypto/rand" "crypto/sha256" + "fmt" + mathrand "math/rand" "reflect" "sync" "testing" "time" utilclock "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" ) @@ -51,11 +54,13 @@ func TestCachedTokenAuthenticator(t *testing.T) { a.AuthenticateToken(context.Background(), "bad1") a.AuthenticateToken(context.Background(), "bad2") a.AuthenticateToken(context.Background(), "bad3") + fakeClock.Step(2 * time.Microsecond) a.AuthenticateToken(context.Background(), "bad1") a.AuthenticateToken(context.Background(), "bad2") a.AuthenticateToken(context.Background(), "bad3") + fakeClock.Step(2 * time.Microsecond) if !reflect.DeepEqual(calledWithToken, []string{"bad1", "bad2", "bad3", "bad1", "bad2", "bad3"}) { - t.Errorf("Expected failing calls to bypass cache, got %v", calledWithToken) + t.Errorf("Expected failing calls to not stay in the cache, got %v", calledWithToken) } // reset calls, make the backend return success for three user tokens @@ -165,3 +170,168 @@ func BenchmarkKeyFunc(b *testing.B) { bKey = key }) } + +func BenchmarkCachedTokenAuthenticator(b *testing.B) { + tokenCount := []int{100, 500, 2500, 12500, 62500} + for _, tc := range tokenCount { + b.Run(fmt.Sprintf("toks-%v", tc), newSingleBenchmark(tc).bench) + } +} + +func newSingleBenchmark(tokenCount int) *singleBenchmark { + s := &singleBenchmark{} + s.makeTokens(tokenCount) + return s +} + +// singleBenchmark collects all the state needed to run a benchmark. The +// question this benchmark answers is, "what's the average latency added by the +// cache for N concurrent tokens?" +type singleBenchmark struct { + // These token.* variables are set by makeTokens() + tokenCount int + // pre-computed response for a token + tokenToResponse map[string]*cacheRecord + // include audiences for some + tokenToAuds map[string]authenticator.Audiences + // a list makes it easy to select a random one + tokens []string + + // Simulate slowness, qps limit, external service limitation, etc + chokepoint chan struct{} + + b *testing.B + wg sync.WaitGroup +} + +func (s *singleBenchmark) makeTokens(count int) { + s.tokenCount = count + s.tokenToResponse = map[string]*cacheRecord{} + s.tokenToAuds = map[string]authenticator.Audiences{} + s.tokens = []string{} + + for i := 0; i < s.tokenCount; i++ { + tok := fmt.Sprintf("%v-%v", jwtToken, i) + r := cacheRecord{ + resp: &authenticator.Response{ + User: &user.DefaultInfo{Name: fmt.Sprintf("holder of token %v", i)}, + }, + } + // make different combinations of audience, failures, denies for the tokens. + auds := []string{} + for i := 0; i < mathrand.Intn(4); i++ { + auds = append(auds, string(uuid.NewUUID())) + } + choice := mathrand.Intn(1000) + switch { + case choice < 900: + r.ok = true + r.err = nil + case choice < 990: + r.ok = false + r.err = nil + default: + r.ok = false + r.err = fmt.Errorf("I can't think of a clever error name right now") + } + s.tokens = append(s.tokens, tok) + s.tokenToResponse[tok] = &r + if len(auds) > 0 { + s.tokenToAuds[tok] = auds + } + } +} + +func (s *singleBenchmark) lookup(ctx context.Context, token string) (*authenticator.Response, bool, error) { + <-s.chokepoint + defer func() { s.chokepoint <- struct{}{} }() + time.Sleep(1 * time.Millisecond) + r, ok := s.tokenToResponse[token] + if !ok { + panic("test setup problem") + } + return r.resp, r.ok, r.err +} + +// To prevent contention over a channel, and to minimize the case where some +// goroutines finish before others, vary the number of goroutines and batch +// size based on the benchmark size. +func (s *singleBenchmark) queueBatches() (<-chan int, int) { + batchSize := 1 + threads := 1 + + switch { + case s.b.N < 5000: + threads = s.b.N + batchSize = 1 + default: + threads = 5000 + batchSize = s.b.N / (threads * 10) + if batchSize < 1 { + batchSize = 1 + } + } + + batches := make(chan int, threads*2) + s.wg.Add(1) + go func() { + defer s.wg.Done() + defer close(batches) + remaining := s.b.N + for remaining > batchSize { + batches <- batchSize + remaining -= batchSize + } + batches <- remaining + }() + + return batches, threads +} + +func (s *singleBenchmark) doAuthForTokenN(n int, a authenticator.Token) { + tok := s.tokens[n] + auds := s.tokenToAuds[tok] + ctx := context.Background() + ctx = authenticator.WithAudiences(ctx, auds) + a.AuthenticateToken(ctx, tok) +} + +func (s *singleBenchmark) bench(b *testing.B) { + s.b = b + a := newWithClock( + authenticator.TokenFunc(s.lookup), + true, + 4*time.Second, + 500*time.Millisecond, + utilclock.RealClock{}, + ) + const maxInFlight = 40 + s.chokepoint = make(chan struct{}, maxInFlight) + for i := 0; i < maxInFlight; i++ { + s.chokepoint <- struct{}{} + } + + batches, threadCount := s.queueBatches() + s.b.ResetTimer() + + for i := 0; i < threadCount; i++ { + s.wg.Add(1) + go func() { + defer s.wg.Done() + // don't contend over the lock for the global rand.Rand + r := mathrand.New(mathrand.NewSource(mathrand.Int63())) + for count := range batches { + for i := 0; i < count; i++ { + // some problems appear with random + // access, some appear with many + // requests for a single entry, so we + // do both. + s.doAuthForTokenN(r.Intn(len(s.tokens)), a) + s.doAuthForTokenN(0, a) + } + } + }() + } + + s.wg.Wait() +}