mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
Merge pull request #84424 from mikedanese/expcache
Add an expiring cache for the caching token authenticator
This commit is contained in:
commit
19b4017b5d
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package authenticator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
@ -192,7 +193,7 @@ func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, er
|
||||
tokenAuth := tokenunion.New(tokenAuthenticators...)
|
||||
// Optionally cache authentication results
|
||||
if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
|
||||
tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
|
||||
tokenAuth = tokencache.New(context.TODO(), tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
|
||||
}
|
||||
authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
|
||||
securityDefinitions["BearerToken"] = &spec.SecurityScheme{
|
||||
@ -312,5 +313,5 @@ func newWebhookTokenAuthenticator(webhookConfigFile string, version string, ttl
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tokencache.New(webhookTokenAuthenticator, false, ttl, ttl), nil
|
||||
return tokencache.New(context.TODO(), webhookTokenAuthenticator, false, ttl, ttl), nil
|
||||
}
|
||||
|
@ -8,20 +8,30 @@ load(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["lruexpirecache_test.go"],
|
||||
srcs = [
|
||||
"expiring_test.go",
|
||||
"lruexpirecache_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/github.com/golang/groupcache/lru:go_default_library",
|
||||
"//vendor/github.com/google/uuid:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["lruexpirecache.go"],
|
||||
srcs = [
|
||||
"expiring.go",
|
||||
"lruexpirecache.go",
|
||||
],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/cache",
|
||||
importpath = "k8s.io/apimachinery/pkg/util/cache",
|
||||
deps = ["//vendor/github.com/hashicorp/golang-lru:go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/github.com/hashicorp/golang-lru:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
|
208
staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go
vendored
Normal file
208
staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go
vendored
Normal file
@ -0,0 +1,208 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilclock "k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
// NewExpiring returns an initialized expiring cache. Users must call
|
||||
// (*Expiring).Run() to begin the GC goroutine.
|
||||
func NewExpiring() *Expiring {
|
||||
return NewExpiringWithClock(utilclock.RealClock{})
|
||||
}
|
||||
|
||||
// NewExpiringWithClock is like NewExpiring but allows passing in a custom
|
||||
// clock for testing.
|
||||
func NewExpiringWithClock(clock utilclock.Clock) *Expiring {
|
||||
return &Expiring{
|
||||
clock: clock,
|
||||
cache: make(map[interface{}]entry),
|
||||
}
|
||||
}
|
||||
|
||||
// Expiring is a map whose entries expire after a per-entry timeout.
|
||||
type Expiring struct {
|
||||
clock utilclock.Clock
|
||||
|
||||
// mu protects the below fields
|
||||
mu sync.RWMutex
|
||||
// cache is the internal map that backs the cache.
|
||||
cache map[interface{}]entry
|
||||
// generation is used as a cheap resource version for cache entries. Cleanups
|
||||
// are scheduled with a key and generation. When the cleanup runs, it first
|
||||
// compares its generation with the current generation of the entry. It
|
||||
// deletes the entry iff the generation matches. This prevents cleanups
|
||||
// scheduled for earlier versions of an entry from deleting later versions of
|
||||
// an entry when Set() is called multiple times with the same key.
|
||||
//
|
||||
// The integer value of the generation of an entry is meaningless.
|
||||
generation uint64
|
||||
|
||||
heap expiringHeap
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
val interface{}
|
||||
expiry time.Time
|
||||
generation uint64
|
||||
}
|
||||
|
||||
// Get looks up an entry in the cache.
|
||||
func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
e, ok := c.cache[key]
|
||||
if !ok || c.clock.Now().After(e.expiry) {
|
||||
return nil, false
|
||||
}
|
||||
return e.val, true
|
||||
}
|
||||
|
||||
// Set sets a key/value/expiry entry in the map, overwriting any previous entry
|
||||
// with the same key. The entry expires at the given expiry time, but its TTL
|
||||
// may be lengthened or shortened by additional calls to Set().
|
||||
func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) {
|
||||
expiry := c.clock.Now().Add(ttl)
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.generation++
|
||||
|
||||
c.cache[key] = entry{
|
||||
val: val,
|
||||
expiry: expiry,
|
||||
generation: c.generation,
|
||||
}
|
||||
|
||||
heap.Push(&c.heap, &expiringHeapEntry{
|
||||
key: key,
|
||||
generation: c.generation,
|
||||
expiry: expiry,
|
||||
})
|
||||
}
|
||||
|
||||
// Delete deletes an entry in the map.
|
||||
func (c *Expiring) Delete(key interface{}) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.del(key, 0)
|
||||
}
|
||||
|
||||
// del deletes the entry for the given key. The generation argument is the
|
||||
// generation of the entry that should be deleted. If the generation has been
|
||||
// changed (e.g. if a set has occurred on an existing element but the old
|
||||
// cleanup still runs), this is a noop. If the generation argument is 0, the
|
||||
// entry's generation is ignored and the entry is deleted.
|
||||
//
|
||||
// del must be called under the write lock.
|
||||
func (c *Expiring) del(key interface{}, generation uint64) {
|
||||
e, ok := c.cache[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if generation != 0 && generation != e.generation {
|
||||
return
|
||||
}
|
||||
delete(c.cache, key)
|
||||
}
|
||||
|
||||
// Len returns the number of items in the cache.
|
||||
func (c *Expiring) Len() int {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return len(c.cache)
|
||||
}
|
||||
|
||||
const gcInterval = 50 * time.Millisecond
|
||||
|
||||
// Run runs the GC goroutine. The goroutine exits when the passed in context is
|
||||
// cancelled.
|
||||
func (c *Expiring) Run(ctx context.Context) {
|
||||
t := c.clock.NewTicker(gcInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C():
|
||||
c.gc()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Expiring) gc() {
|
||||
now := c.clock.Now()
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for {
|
||||
// Return from gc if the heap is empty or the next element is not yet
|
||||
// expired.
|
||||
//
|
||||
// heap[0] is a peek at the next element in the heap, which is not obvious
|
||||
// from looking at the (*expiringHeap).Pop() implmentation below.
|
||||
// heap.Pop() swaps the first entry with the last entry of the heap, then
|
||||
// calls (*expiringHeap).Pop() which returns the last element.
|
||||
if len(c.heap) == 0 || now.After(c.heap[0].expiry) {
|
||||
return
|
||||
}
|
||||
cleanup := heap.Pop(&c.heap).(*expiringHeapEntry)
|
||||
c.del(cleanup.key, cleanup.generation)
|
||||
}
|
||||
}
|
||||
|
||||
type expiringHeapEntry struct {
|
||||
key interface{}
|
||||
generation uint64
|
||||
expiry time.Time
|
||||
}
|
||||
|
||||
// expiringHeap is a min-heap ordered by expiration time of it's entries. The
|
||||
// expiring cache uses this as a priority queue efficiently organize entries to
|
||||
// be garbage collected once they expire.
|
||||
type expiringHeap []*expiringHeapEntry
|
||||
|
||||
var _ heap.Interface = &expiringHeap{}
|
||||
|
||||
func (cq expiringHeap) Len() int {
|
||||
return len(cq)
|
||||
}
|
||||
|
||||
func (cq expiringHeap) Less(i, j int) bool {
|
||||
return cq[i].expiry.Before(cq[j].expiry)
|
||||
}
|
||||
|
||||
func (cq expiringHeap) Swap(i, j int) {
|
||||
cq[i], cq[j] = cq[j], cq[i]
|
||||
}
|
||||
|
||||
func (cq *expiringHeap) Push(c interface{}) {
|
||||
*cq = append(*cq, c.(*expiringHeapEntry))
|
||||
}
|
||||
|
||||
func (cq *expiringHeap) Pop() interface{} {
|
||||
c := (*cq)[cq.Len()-1]
|
||||
*cq = (*cq)[:cq.Len()-1]
|
||||
return c
|
||||
}
|
197
staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go
vendored
Normal file
197
staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go
vendored
Normal file
@ -0,0 +1,197 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
utilclock "k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
func TestExpiringCache(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cache := NewExpiring()
|
||||
go cache.Run(ctx)
|
||||
|
||||
if result, ok := cache.Get("foo"); ok || result != nil {
|
||||
t.Errorf("Expected null, false, got %#v, %v", result, ok)
|
||||
}
|
||||
|
||||
record1 := "bob"
|
||||
record2 := "alice"
|
||||
|
||||
// when empty, record is stored
|
||||
cache.Set("foo", record1, time.Hour)
|
||||
if result, ok := cache.Get("foo"); !ok || result != record1 {
|
||||
t.Errorf("Expected %#v, true, got %#v, %v", record1, result, ok)
|
||||
}
|
||||
|
||||
// newer record overrides
|
||||
cache.Set("foo", record2, time.Hour)
|
||||
if result, ok := cache.Get("foo"); !ok || result != record2 {
|
||||
t.Errorf("Expected %#v, true, got %#v, %v", record2, result, ok)
|
||||
}
|
||||
|
||||
// delete the current value
|
||||
cache.Delete("foo")
|
||||
if result, ok := cache.Get("foo"); ok || result != nil {
|
||||
t.Errorf("Expected null, false, got %#v, %v", result, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpiration(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
fc := &utilclock.FakeClock{}
|
||||
c := NewExpiringWithClock(fc)
|
||||
go c.Run(ctx)
|
||||
|
||||
c.Set("a", "a", time.Second)
|
||||
|
||||
fc.Step(500 * time.Millisecond)
|
||||
if _, ok := c.Get("a"); !ok {
|
||||
t.Fatalf("we should have found a key")
|
||||
}
|
||||
|
||||
fc.Step(time.Second)
|
||||
if _, ok := c.Get("a"); ok {
|
||||
t.Fatalf("we should not have found a key")
|
||||
}
|
||||
|
||||
c.Set("a", "a", time.Second)
|
||||
|
||||
fc.Step(500 * time.Millisecond)
|
||||
if _, ok := c.Get("a"); !ok {
|
||||
t.Fatalf("we should have found a key")
|
||||
}
|
||||
|
||||
// reset should restart the ttl
|
||||
c.Set("a", "a", time.Second)
|
||||
|
||||
fc.Step(750 * time.Millisecond)
|
||||
if _, ok := c.Get("a"); !ok {
|
||||
t.Fatalf("we should have found a key")
|
||||
}
|
||||
|
||||
// Simulate a race between a reset and cleanup. Assert that del doesn't
|
||||
// remove the key.
|
||||
c.Set("a", "a", time.Second)
|
||||
|
||||
c.mu.Lock()
|
||||
e := c.cache["a"]
|
||||
e.generation++
|
||||
e.expiry = e.expiry.Add(1 * time.Second)
|
||||
c.cache["a"] = e
|
||||
c.mu.Unlock()
|
||||
|
||||
fc.Step(1 * time.Second)
|
||||
if _, ok := c.Get("a"); !ok {
|
||||
t.Fatalf("we should have found a key")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExpiringCacheContention(b *testing.B) {
|
||||
b.Run("evict_probablility=100%", func(b *testing.B) {
|
||||
benchmarkExpiringCacheContention(b, 1)
|
||||
})
|
||||
b.Run("evict_probablility=10%", func(b *testing.B) {
|
||||
benchmarkExpiringCacheContention(b, 0.1)
|
||||
})
|
||||
b.Run("evict_probablility=1%", func(b *testing.B) {
|
||||
benchmarkExpiringCacheContention(b, 0.01)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkExpiringCacheContention(b *testing.B, prob float64) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
const numKeys = 1 << 16
|
||||
cache := NewExpiring()
|
||||
go cache.Run(ctx)
|
||||
|
||||
keys := []string{}
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := uuid.New().String()
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
b.SetParallelism(256)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
rand := rand.New(rand.NewSource(rand.Int63()))
|
||||
for pb.Next() {
|
||||
i := rand.Int31()
|
||||
key := keys[i%numKeys]
|
||||
_, ok := cache.Get(key)
|
||||
if ok {
|
||||
// compare lower bits of sampled i to decide whether we should evict.
|
||||
if rand.Float64() < prob {
|
||||
cache.Delete(key)
|
||||
}
|
||||
} else {
|
||||
cache.Set(key, struct{}{}, 50*time.Millisecond)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStressExpiringCache(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
const numKeys = 1 << 16
|
||||
cache := NewExpiring()
|
||||
go cache.Run(ctx)
|
||||
|
||||
keys := []string{}
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := uuid.New().String()
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 256; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
rand := rand.New(rand.NewSource(rand.Int63()))
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
key := keys[rand.Intn(numKeys)]
|
||||
if _, ok := cache.Get(key); !ok {
|
||||
cache.Set(key, struct{}{}, time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}
|
@ -40,6 +40,7 @@ filegroup(
|
||||
"//staging/src/k8s.io/apiserver/pkg/server:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package authenticatorfactory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
@ -83,7 +84,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL)
|
||||
cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL)
|
||||
authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth))
|
||||
|
||||
securityDefinitions["BearerToken"] = &spec.SecurityScheme{
|
||||
|
@ -17,22 +17,25 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
lrucache "k8s.io/apimachinery/pkg/util/cache"
|
||||
utilcache "k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
type simpleCache struct {
|
||||
lru *lrucache.LRUExpireCache
|
||||
cache *utilcache.Expiring
|
||||
}
|
||||
|
||||
func newSimpleCache(size int, clock clock.Clock) cache {
|
||||
return &simpleCache{lru: lrucache.NewLRUExpireCacheWithClock(size, clock)}
|
||||
func newSimpleCache(ctx context.Context, clock clock.Clock) cache {
|
||||
c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)}
|
||||
go c.cache.Run(ctx)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *simpleCache) get(key string) (*cacheRecord, bool) {
|
||||
record, ok := c.lru.Get(key)
|
||||
record, ok := c.cache.Get(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
@ -41,9 +44,9 @@ func (c *simpleCache) get(key string) (*cacheRecord, bool) {
|
||||
}
|
||||
|
||||
func (c *simpleCache) set(key string, value *cacheRecord, ttl time.Duration) {
|
||||
c.lru.Add(key, value, ttl)
|
||||
c.cache.Set(key, value, ttl)
|
||||
}
|
||||
|
||||
func (c *simpleCache) remove(key string) {
|
||||
c.lru.Remove(key)
|
||||
c.cache.Delete(key)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
@ -30,7 +31,9 @@ import (
|
||||
)
|
||||
|
||||
func TestSimpleCache(t *testing.T) {
|
||||
testCache(newSimpleCache(4096, clock.RealClock{}), t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
testCache(newSimpleCache(ctx, clock.RealClock{}), t)
|
||||
}
|
||||
|
||||
// Note: the performance profile of this benchmark may not match that in the production.
|
||||
@ -39,16 +42,22 @@ func TestSimpleCache(t *testing.T) {
|
||||
func BenchmarkCacheContentions(b *testing.B) {
|
||||
for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} {
|
||||
b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys)
|
||||
})
|
||||
b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStripedCache(t *testing.T) {
|
||||
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t)
|
||||
}
|
||||
|
||||
func benchmarkCache(cache cache, b *testing.B, numKeys int) {
|
||||
|
@ -64,11 +64,11 @@ type cache interface {
|
||||
}
|
||||
|
||||
// New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache.
|
||||
func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token {
|
||||
return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{})
|
||||
func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token {
|
||||
return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{})
|
||||
}
|
||||
|
||||
func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token {
|
||||
func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token {
|
||||
randomCacheKey := make([]byte, 32)
|
||||
if _, err := rand.Read(randomCacheKey); err != nil {
|
||||
panic(err) // rand should never fail
|
||||
@ -86,7 +86,7 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL,
|
||||
// used. Currently we advertise support 5k nodes and 10k
|
||||
// namespaces; a 32k entry cache is therefore a 2x safety
|
||||
// margin.
|
||||
cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(1024, clock) }),
|
||||
cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }),
|
||||
|
||||
hashPool: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
|
@ -50,7 +50,10 @@ func TestCachedTokenAuthenticator(t *testing.T) {
|
||||
})
|
||||
fakeClock := utilclock.NewFakeClock(time.Now())
|
||||
|
||||
a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
|
||||
calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil
|
||||
a.AuthenticateToken(context.Background(), "bad1")
|
||||
@ -124,7 +127,10 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) {
|
||||
})
|
||||
fakeClock := utilclock.NewFakeClock(time.Now())
|
||||
|
||||
a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
|
||||
resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"}
|
||||
resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"}
|
||||
@ -270,6 +276,8 @@ func (s *singleBenchmark) run(b *testing.B) {
|
||||
}
|
||||
|
||||
func (s *singleBenchmark) bench(b *testing.B) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// Simulate slowness, qps limit, external service limitation, etc
|
||||
const maxInFlight = 40
|
||||
chokepoint := make(chan struct{}, maxInFlight)
|
||||
@ -277,6 +285,7 @@ func (s *singleBenchmark) bench(b *testing.B) {
|
||||
var lookups uint64
|
||||
|
||||
a := newWithClock(
|
||||
ctx,
|
||||
authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) {
|
||||
atomic.AddUint64(&lookups, 1)
|
||||
|
||||
|
13
staging/src/k8s.io/apiserver/pkg/util/cache/BUILD
vendored
Normal file
13
staging/src/k8s.io/apiserver/pkg/util/cache/BUILD
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode }
|
||||
|
||||
// newV1TokenAuthenticator creates a temporary kubeconfig file from the provided
|
||||
// arguments and attempts to load a new WebhookTokenAuthenticator from it.
|
||||
func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -203,7 +203,7 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cache.New(authn, false, cacheTime, cacheTime), nil
|
||||
return cache.New(ctx, authn, false, cacheTime, cacheTime), nil
|
||||
}
|
||||
|
||||
func TestV1TLSConfig(t *testing.T) {
|
||||
@ -259,7 +259,10 @@ func TestV1TLSConfig(t *testing.T) {
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: failed to create client: %v", tt.test, err)
|
||||
return
|
||||
@ -482,12 +485,14 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) {
|
||||
token := "my-s3cr3t-t0ken"
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if tt.reqAuds != nil {
|
||||
ctx = authenticator.WithAudiences(ctx, tt.reqAuds)
|
||||
}
|
||||
@ -554,8 +559,11 @@ func TestV1WebhookCacheAndRetry(t *testing.T) {
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an authenticator that caches successful responses "forever" (100 days).
|
||||
wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode }
|
||||
|
||||
// newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided
|
||||
// arguments and attempts to load a new WebhookTokenAuthenticator from it.
|
||||
func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cache.New(authn, false, cacheTime, cacheTime), nil
|
||||
return cache.New(ctx, authn, false, cacheTime, cacheTime), nil
|
||||
}
|
||||
|
||||
func TestV1beta1TLSConfig(t *testing.T) {
|
||||
@ -261,7 +261,10 @@ func TestV1beta1TLSConfig(t *testing.T) {
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: failed to create client: %v", tt.test, err)
|
||||
return
|
||||
@ -484,12 +487,14 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) {
|
||||
token := "my-s3cr3t-t0ken"
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if tt.reqAuds != nil {
|
||||
ctx = authenticator.WithAudiences(ctx, tt.reqAuds)
|
||||
}
|
||||
@ -556,8 +561,11 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) {
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an authenticator that caches successful responses "forever" (100 days).
|
||||
wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
1
staging/src/k8s.io/cloud-provider/go.sum
generated
1
staging/src/k8s.io/cloud-provider/go.sum
generated
@ -51,6 +51,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
|
1
staging/src/k8s.io/kubectl/go.sum
generated
1
staging/src/k8s.io/kubectl/go.sum
generated
@ -115,6 +115,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
|
1
staging/src/k8s.io/node-api/go.sum
generated
1
staging/src/k8s.io/node-api/go.sum
generated
@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
|
1
staging/src/k8s.io/sample-controller/go.sum
generated
1
staging/src/k8s.io/sample-controller/go.sum
generated
@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
|
||||
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
|
||||
|
@ -70,7 +70,7 @@ func getTestTokenAuth() authenticator.Request {
|
||||
return group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated})
|
||||
}
|
||||
|
||||
func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) {
|
||||
func getTestWebhookTokenAuth(ctx context.Context, serverURL string) (authenticator.Request, error) {
|
||||
kubecfgFile, err := ioutil.TempFile("", "webhook-kubecfg")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -90,7 +90,7 @@ func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bearertoken.New(cache.New(webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil
|
||||
return bearertoken.New(cache.New(ctx, webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil
|
||||
}
|
||||
|
||||
func path(resource, namespace, name string) string {
|
||||
@ -1176,7 +1176,11 @@ func TestReadOnlyAuthorization(t *testing.T) {
|
||||
func TestWebhookTokenAuthenticator(t *testing.T) {
|
||||
authServer := newTestWebhookTokenAuthServer()
|
||||
defer authServer.Close()
|
||||
authenticator, err := getTestWebhookTokenAuth(authServer.URL)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
authenticator, err := getTestWebhookTokenAuth(ctx, authServer.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("error starting webhook token authenticator server: %v", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user