diff --git a/pkg/kubeapiserver/authenticator/config.go b/pkg/kubeapiserver/authenticator/config.go index 0e27620d603..5a52c22630b 100644 --- a/pkg/kubeapiserver/authenticator/config.go +++ b/pkg/kubeapiserver/authenticator/config.go @@ -17,6 +17,7 @@ limitations under the License. package authenticator import ( + "context" "time" "github.com/go-openapi/spec" @@ -192,7 +193,7 @@ func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, er tokenAuth := tokenunion.New(tokenAuthenticators...) // Optionally cache authentication results if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 { - tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) + tokenAuth = tokencache.New(context.TODO(), tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) } authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ @@ -312,5 +313,5 @@ func newWebhookTokenAuthenticator(webhookConfigFile string, version string, ttl return nil, err } - return tokencache.New(webhookTokenAuthenticator, false, ttl, ttl), nil + return tokencache.New(context.TODO(), webhookTokenAuthenticator, false, ttl, ttl), nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD index 85a1f3d95f1..ff280d456ab 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD @@ -8,20 +8,30 @@ load( go_test( name = "go_default_test", - srcs = ["lruexpirecache_test.go"], + srcs = [ + "expiring_test.go", + "lruexpirecache_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", + "//vendor/github.com/google/uuid:go_default_library", ], ) go_library( name = "go_default_library", - srcs = ["lruexpirecache.go"], + srcs = [ + "expiring.go", + "lruexpirecache.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/cache", importpath = "k8s.io/apimachinery/pkg/util/cache", - deps = ["//vendor/github.com/hashicorp/golang-lru:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go new file mode 100644 index 00000000000..e9ebfc877da --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go @@ -0,0 +1,208 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +// NewExpiring returns an initialized expiring cache. Users must call +// (*Expiring).Run() to begin the GC goroutine. +func NewExpiring() *Expiring { + return NewExpiringWithClock(utilclock.RealClock{}) +} + +// NewExpiringWithClock is like NewExpiring but allows passing in a custom +// clock for testing. +func NewExpiringWithClock(clock utilclock.Clock) *Expiring { + return &Expiring{ + clock: clock, + cache: make(map[interface{}]entry), + } +} + +// Expiring is a map whose entries expire after a per-entry timeout. +type Expiring struct { + clock utilclock.Clock + + // mu protects the below fields + mu sync.RWMutex + // cache is the internal map that backs the cache. + cache map[interface{}]entry + // generation is used as a cheap resource version for cache entries. Cleanups + // are scheduled with a key and generation. When the cleanup runs, it first + // compares its generation with the current generation of the entry. It + // deletes the entry iff the generation matches. This prevents cleanups + // scheduled for earlier versions of an entry from deleting later versions of + // an entry when Set() is called multiple times with the same key. + // + // The integer value of the generation of an entry is meaningless. + generation uint64 + + heap expiringHeap +} + +type entry struct { + val interface{} + expiry time.Time + generation uint64 +} + +// Get looks up an entry in the cache. +func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) { + c.mu.RLock() + defer c.mu.RUnlock() + e, ok := c.cache[key] + if !ok || c.clock.Now().After(e.expiry) { + return nil, false + } + return e.val, true +} + +// Set sets a key/value/expiry entry in the map, overwriting any previous entry +// with the same key. The entry expires at the given expiry time, but its TTL +// may be lengthened or shortened by additional calls to Set(). +func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) { + expiry := c.clock.Now().Add(ttl) + + c.mu.Lock() + defer c.mu.Unlock() + + c.generation++ + + c.cache[key] = entry{ + val: val, + expiry: expiry, + generation: c.generation, + } + + heap.Push(&c.heap, &expiringHeapEntry{ + key: key, + generation: c.generation, + expiry: expiry, + }) +} + +// Delete deletes an entry in the map. +func (c *Expiring) Delete(key interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + c.del(key, 0) +} + +// del deletes the entry for the given key. The generation argument is the +// generation of the entry that should be deleted. If the generation has been +// changed (e.g. if a set has occurred on an existing element but the old +// cleanup still runs), this is a noop. If the generation argument is 0, the +// entry's generation is ignored and the entry is deleted. +// +// del must be called under the write lock. +func (c *Expiring) del(key interface{}, generation uint64) { + e, ok := c.cache[key] + if !ok { + return + } + if generation != 0 && generation != e.generation { + return + } + delete(c.cache, key) +} + +// Len returns the number of items in the cache. +func (c *Expiring) Len() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.cache) +} + +const gcInterval = 50 * time.Millisecond + +// Run runs the GC goroutine. The goroutine exits when the passed in context is +// cancelled. +func (c *Expiring) Run(ctx context.Context) { + t := c.clock.NewTicker(gcInterval) + defer t.Stop() + for { + select { + case <-t.C(): + c.gc() + case <-ctx.Done(): + return + } + } +} + +func (c *Expiring) gc() { + now := c.clock.Now() + + c.mu.Lock() + defer c.mu.Unlock() + for { + // Return from gc if the heap is empty or the next element is not yet + // expired. + // + // heap[0] is a peek at the next element in the heap, which is not obvious + // from looking at the (*expiringHeap).Pop() implmentation below. + // heap.Pop() swaps the first entry with the last entry of the heap, then + // calls (*expiringHeap).Pop() which returns the last element. + if len(c.heap) == 0 || now.After(c.heap[0].expiry) { + return + } + cleanup := heap.Pop(&c.heap).(*expiringHeapEntry) + c.del(cleanup.key, cleanup.generation) + } +} + +type expiringHeapEntry struct { + key interface{} + generation uint64 + expiry time.Time +} + +// expiringHeap is a min-heap ordered by expiration time of it's entries. The +// expiring cache uses this as a priority queue efficiently organize entries to +// be garbage collected once they expire. +type expiringHeap []*expiringHeapEntry + +var _ heap.Interface = &expiringHeap{} + +func (cq expiringHeap) Len() int { + return len(cq) +} + +func (cq expiringHeap) Less(i, j int) bool { + return cq[i].expiry.Before(cq[j].expiry) +} + +func (cq expiringHeap) Swap(i, j int) { + cq[i], cq[j] = cq[j], cq[i] +} + +func (cq *expiringHeap) Push(c interface{}) { + *cq = append(*cq, c.(*expiringHeapEntry)) +} + +func (cq *expiringHeap) Pop() interface{} { + c := (*cq)[cq.Len()-1] + *cq = (*cq)[:cq.Len()-1] + return c +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go new file mode 100644 index 00000000000..e3114959113 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/google/uuid" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +func TestExpiringCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cache := NewExpiring() + go cache.Run(ctx) + + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } + + record1 := "bob" + record2 := "alice" + + // when empty, record is stored + cache.Set("foo", record1, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record1 { + t.Errorf("Expected %#v, true, got %#v, %v", record1, result, ok) + } + + // newer record overrides + cache.Set("foo", record2, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record2 { + t.Errorf("Expected %#v, true, got %#v, %v", record2, result, ok) + } + + // delete the current value + cache.Delete("foo") + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } +} + +func TestExpiration(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fc := &utilclock.FakeClock{} + c := NewExpiringWithClock(fc) + go c.Run(ctx) + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + fc.Step(time.Second) + if _, ok := c.Get("a"); ok { + t.Fatalf("we should not have found a key") + } + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // reset should restart the ttl + c.Set("a", "a", time.Second) + + fc.Step(750 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // Simulate a race between a reset and cleanup. Assert that del doesn't + // remove the key. + c.Set("a", "a", time.Second) + + c.mu.Lock() + e := c.cache["a"] + e.generation++ + e.expiry = e.expiry.Add(1 * time.Second) + c.cache["a"] = e + c.mu.Unlock() + + fc.Step(1 * time.Second) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } +} + +func BenchmarkExpiringCacheContention(b *testing.B) { + b.Run("evict_probablility=100%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 1) + }) + b.Run("evict_probablility=10%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.1) + }) + b.Run("evict_probablility=1%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.01) + }) +} + +func benchmarkExpiringCacheContention(b *testing.B, prob float64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + b.ResetTimer() + + b.SetParallelism(256) + b.RunParallel(func(pb *testing.PB) { + rand := rand.New(rand.NewSource(rand.Int63())) + for pb.Next() { + i := rand.Int31() + key := keys[i%numKeys] + _, ok := cache.Get(key) + if ok { + // compare lower bits of sampled i to decide whether we should evict. + if rand.Float64() < prob { + cache.Delete(key) + } + } else { + cache.Set(key, struct{}{}, 50*time.Millisecond) + } + } + }) +} + +func TestStressExpiringCache(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + var wg sync.WaitGroup + for i := 0; i < 256; i++ { + wg.Add(1) + go func() { + rand := rand.New(rand.NewSource(rand.Int63())) + for { + select { + case <-ctx.Done(): + return + default: + } + key := keys[rand.Intn(numKeys)] + if _, ok := cache.Get(key); !ok { + cache.Set(key, struct{}{}, time.Second) + } + } + }() + } + + wg.Done() +} diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index a6b511f8dd9..e2ad5df4d77 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -40,6 +40,7 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/server:all-srcs", "//staging/src/k8s.io/apiserver/pkg/storage:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go index b9c7e2e6eee..7b384b6bd05 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go @@ -17,6 +17,7 @@ limitations under the License. package authenticatorfactory import ( + "context" "errors" "time" @@ -83,7 +84,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur if err != nil { return nil, nil, err } - cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL) + cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL) authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go index 18d5692d7a7..5012097343b 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go @@ -17,22 +17,25 @@ limitations under the License. package cache import ( + "context" "time" - lrucache "k8s.io/apimachinery/pkg/util/cache" + utilcache "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/clock" ) type simpleCache struct { - lru *lrucache.LRUExpireCache + cache *utilcache.Expiring } -func newSimpleCache(size int, clock clock.Clock) cache { - return &simpleCache{lru: lrucache.NewLRUExpireCacheWithClock(size, clock)} +func newSimpleCache(ctx context.Context, clock clock.Clock) cache { + c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)} + go c.cache.Run(ctx) + return c } func (c *simpleCache) get(key string) (*cacheRecord, bool) { - record, ok := c.lru.Get(key) + record, ok := c.cache.Get(key) if !ok { return nil, false } @@ -41,9 +44,9 @@ func (c *simpleCache) get(key string) (*cacheRecord, bool) { } func (c *simpleCache) set(key string, value *cacheRecord, ttl time.Duration) { - c.lru.Add(key, value, ttl) + c.cache.Set(key, value, ttl) } func (c *simpleCache) remove(key string) { - c.lru.Remove(key) + c.cache.Delete(key) } diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go index b7fe4cb73cd..1984c4c6b1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand" "testing" @@ -30,7 +31,9 @@ import ( ) func TestSimpleCache(t *testing.T) { - testCache(newSimpleCache(4096, clock.RealClock{}), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newSimpleCache(ctx, clock.RealClock{}), t) } // Note: the performance profile of this benchmark may not match that in the production. @@ -39,16 +42,22 @@ func TestSimpleCache(t *testing.T) { func BenchmarkCacheContentions(b *testing.B) { for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys) }) b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys) }) } } func TestStripedCache(t *testing.T) { - testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t) } func benchmarkCache(cache cache, b *testing.B, numKeys int) { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go index d94866d5baa..19d8e27e120 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go @@ -64,11 +64,11 @@ type cache interface { } // New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache. -func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { - return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) +func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { + return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) } -func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { +func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { randomCacheKey := make([]byte, 32) if _, err := rand.Read(randomCacheKey); err != nil { panic(err) // rand should never fail @@ -86,7 +86,7 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, // used. Currently we advertise support 5k nodes and 10k // namespaces; a 32k entry cache is therefore a 2x safety // margin. - cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(1024, clock) }), + cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }), hashPool: &sync.Pool{ New: func() interface{} { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index c6fb207e3a7..921f079e9c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -50,7 +50,10 @@ func TestCachedTokenAuthenticator(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil a.AuthenticateToken(context.Background(), "bad1") @@ -124,7 +127,10 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"} resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"} @@ -270,6 +276,8 @@ func (s *singleBenchmark) run(b *testing.B) { } func (s *singleBenchmark) bench(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Simulate slowness, qps limit, external service limitation, etc const maxInFlight = 40 chokepoint := make(chan struct{}, maxInFlight) @@ -277,6 +285,7 @@ func (s *singleBenchmark) bench(b *testing.B) { var lookups uint64 a := newWithClock( + ctx, authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { atomic.AddUint64(&lookups, 1) diff --git a/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD new file mode 100644 index 00000000000..6df04e38cd7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD @@ -0,0 +1,13 @@ +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go index 518cf706301..fa02b605b2d 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go @@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } // newV1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -203,7 +203,7 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1TLSConfig(t *testing.T) { @@ -259,7 +259,10 @@ func TestV1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -482,12 +485,14 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -554,8 +559,11 @@ func TestV1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go index 5c228b69025..998d6a782e9 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go @@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode } // newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca [] return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1beta1TLSConfig(t *testing.T) { @@ -261,7 +261,10 @@ func TestV1beta1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -484,12 +487,14 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -556,8 +561,11 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/cloud-provider/go.sum b/staging/src/k8s.io/cloud-provider/go.sum index 8ad9a696c8d..425d1930297 100644 --- a/staging/src/k8s.io/cloud-provider/go.sum +++ b/staging/src/k8s.io/cloud-provider/go.sum @@ -51,6 +51,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/kubectl/go.sum b/staging/src/k8s.io/kubectl/go.sum index 012c405cb54..36aa4c26254 100644 --- a/staging/src/k8s.io/kubectl/go.sum +++ b/staging/src/k8s.io/kubectl/go.sum @@ -115,6 +115,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/node-api/go.sum b/staging/src/k8s.io/node-api/go.sum index c3966220e8d..c18523feeef 100644 --- a/staging/src/k8s.io/node-api/go.sum +++ b/staging/src/k8s.io/node-api/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/sample-controller/go.sum b/staging/src/k8s.io/sample-controller/go.sum index f816d41aa46..e93eaec773f 100644 --- a/staging/src/k8s.io/sample-controller/go.sum +++ b/staging/src/k8s.io/sample-controller/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/test/integration/auth/auth_test.go b/test/integration/auth/auth_test.go index 852c2228b69..8029a05568b 100644 --- a/test/integration/auth/auth_test.go +++ b/test/integration/auth/auth_test.go @@ -70,7 +70,7 @@ func getTestTokenAuth() authenticator.Request { return group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated}) } -func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) { +func getTestWebhookTokenAuth(ctx context.Context, serverURL string) (authenticator.Request, error) { kubecfgFile, err := ioutil.TempFile("", "webhook-kubecfg") if err != nil { return nil, err @@ -90,7 +90,7 @@ func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) { if err != nil { return nil, err } - return bearertoken.New(cache.New(webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil + return bearertoken.New(cache.New(ctx, webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil } func path(resource, namespace, name string) string { @@ -1176,7 +1176,11 @@ func TestReadOnlyAuthorization(t *testing.T) { func TestWebhookTokenAuthenticator(t *testing.T) { authServer := newTestWebhookTokenAuthServer() defer authServer.Close() - authenticator, err := getTestWebhookTokenAuth(authServer.URL) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + authenticator, err := getTestWebhookTokenAuth(ctx, authServer.URL) if err != nil { t.Fatalf("error starting webhook token authenticator server: %v", err) }