From 9167711fd18511ffc9c90ee306c462be9fc7999b Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Sat, 26 Oct 2019 12:19:07 -0700 Subject: [PATCH 1/2] Add an expiring cache for the caching token authenticator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit And maybe the webhook authorizer cache. This cache has two primary advantages over the LRU cache used currently: - Cache hits don't acquire an exclusive lock. - More importantly, performance doesn't fallover when the access pattern scans a key space larger than an arbitrary size (e.g. the LRU capacity). The downside of using an expiring cache here is that it doesn't have a maximum size so it's suspectible to DoS when the input is user controlled. This is not the case for successful authentications, and successful authentications have a natural expiry so it might be a good fit here. It has some a few differences compared to: https://github.com/kubernetes/kubernetes/blob/3d7318f29d9f56810efd3d690811cdea730b5317/staging/src/k8s.io/client-go/tools/cache/expiration_cache.go - Expiration is not entirely lazy so keys that are never accessed again are still released from the cache. - It does not acquire an exclusive lock on cache hits. - It supports per entry ttls specified on Set. The expiring cache (without striping) does somewhere in between the simple cache and striped cache in the very contrived contention test where every iteration acquires a write lock: ``` $ benchstat simple.log expiring.log name old time/op new time/op delta Cache-12 2.74µs ± 2% 2.02µs ± 3% -26.37% (p=0.000 n=9+9) name old alloc/op new alloc/op delta Cache-12 182B ± 0% 107B ± 4% -41.21% (p=0.000 n=8+9) name old allocs/op new allocs/op delta Cache-12 5.00 ± 0% 2.00 ± 0% -60.00% (p=0.000 n=10+10) $ benchstat striped.log expiring.log name old time/op new time/op delta Cache-12 1.58µs ± 5% 2.02µs ± 3% +27.34% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Cache-12 288B ± 0% 107B ± 4% -62.85% (p=0.000 n=10+9) name old allocs/op new allocs/op delta Cache-12 9.00 ± 0% 2.00 ± 0% -77.78% (p=0.000 n=10+10) $ benchstat simple.log striped.log expiring.log name \ time/op simple.log striped.log expiring.log Cache-12 2.74µs ± 2% 1.58µs ± 5% 2.02µs ± 3% name \ alloc/op simple.log striped.log expiring.log Cache-12 182B ± 0% 288B ± 0% 107B ± 4% name \ allocs/op simple.log striped.log expiring.log Cache-12 5.00 ± 0% 9.00 ± 0% 2.00 ± 0% ``` I also naively replacemed the LRU cache with the expiring cache in the more realisitc CachedTokenAuthenticator benchmarks: https://gist.github.com/mikedanese/41192b6eb62106c0758a4f4885bdad53 For token counts that fit in the LRU, expiring cache does better because it does not require acquiring an exclusive lock for cache hits. For token counts that exceed the size of the LRU, the LRU has a massive performance drop off. The LRU cache is around 5x slower (with lookups taking 1 milisecond and throttled to max 40 lookups in flight). ``` $ benchstat before.log after.log name old time/op new time/op delta CachedTokenAuthenticator/tokens=100_threads=256-12 3.60µs ±22% 1.08µs ± 4% -69.91% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=500_threads=256-12 3.94µs ±19% 1.20µs ± 3% -69.57% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=2500_threads=256-12 3.07µs ± 6% 1.17µs ± 1% -61.87% (p=0.000 n=9+10) CachedTokenAuthenticator/tokens=12500_threads=256-12 3.16µs ±17% 1.38µs ± 1% -56.23% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=62500_threads=256-12 15.0µs ± 1% 2.9µs ± 3% -80.71% (p=0.000 n=10+10) name old alloc/op new alloc/op delta CachedTokenAuthenticator/tokens=100_threads=256-12 337B ± 1% 300B ± 0% -11.06% (p=0.000 n=10+8) CachedTokenAuthenticator/tokens=500_threads=256-12 307B ± 1% 304B ± 0% -0.96% (p=0.000 n=9+10) CachedTokenAuthenticator/tokens=2500_threads=256-12 337B ± 1% 304B ± 0% -9.79% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=12500_threads=256-12 343B ± 1% 276B ± 0% -19.58% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=62500_threads=256-12 493B ± 0% 334B ± 0% -32.12% (p=0.000 n=10+10) name old allocs/op new allocs/op delta CachedTokenAuthenticator/tokens=100_threads=256-12 13.0 ± 0% 11.0 ± 0% -15.38% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=500_threads=256-12 12.0 ± 0% 11.0 ± 0% -8.33% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=2500_threads=256-12 13.0 ± 0% 11.0 ± 0% -15.38% (p=0.000 n=10+10) CachedTokenAuthenticator/tokens=12500_threads=256-12 13.0 ± 0% 10.0 ± 0% -23.08% (p=0.000 n=9+10) CachedTokenAuthenticator/tokens=62500_threads=256-12 17.0 ± 0% 12.0 ± 0% -29.41% (p=0.000 n=10+10) ``` Benchmarked with changes in #84423 Bugs: #83259 #83375 --- .../k8s.io/apimachinery/pkg/util/cache/BUILD | 16 +- .../apimachinery/pkg/util/cache/expiring.go | 208 ++++++++++++++++++ .../pkg/util/cache/expiring_test.go | 197 +++++++++++++++++ staging/src/k8s.io/apiserver/BUILD | 1 + .../src/k8s.io/apiserver/pkg/util/cache/BUILD | 13 ++ staging/src/k8s.io/cloud-provider/go.sum | 1 + staging/src/k8s.io/kubectl/go.sum | 1 + staging/src/k8s.io/node-api/go.sum | 1 + staging/src/k8s.io/sample-controller/go.sum | 1 + 9 files changed, 436 insertions(+), 3 deletions(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/cache/BUILD diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD index 85a1f3d95f1..ff280d456ab 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD @@ -8,20 +8,30 @@ load( go_test( name = "go_default_test", - srcs = ["lruexpirecache_test.go"], + srcs = [ + "expiring_test.go", + "lruexpirecache_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", + "//vendor/github.com/google/uuid:go_default_library", ], ) go_library( name = "go_default_library", - srcs = ["lruexpirecache.go"], + srcs = [ + "expiring.go", + "lruexpirecache.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/cache", importpath = "k8s.io/apimachinery/pkg/util/cache", - deps = ["//vendor/github.com/hashicorp/golang-lru:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go new file mode 100644 index 00000000000..e9ebfc877da --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go @@ -0,0 +1,208 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +// NewExpiring returns an initialized expiring cache. Users must call +// (*Expiring).Run() to begin the GC goroutine. +func NewExpiring() *Expiring { + return NewExpiringWithClock(utilclock.RealClock{}) +} + +// NewExpiringWithClock is like NewExpiring but allows passing in a custom +// clock for testing. +func NewExpiringWithClock(clock utilclock.Clock) *Expiring { + return &Expiring{ + clock: clock, + cache: make(map[interface{}]entry), + } +} + +// Expiring is a map whose entries expire after a per-entry timeout. +type Expiring struct { + clock utilclock.Clock + + // mu protects the below fields + mu sync.RWMutex + // cache is the internal map that backs the cache. + cache map[interface{}]entry + // generation is used as a cheap resource version for cache entries. Cleanups + // are scheduled with a key and generation. When the cleanup runs, it first + // compares its generation with the current generation of the entry. It + // deletes the entry iff the generation matches. This prevents cleanups + // scheduled for earlier versions of an entry from deleting later versions of + // an entry when Set() is called multiple times with the same key. + // + // The integer value of the generation of an entry is meaningless. + generation uint64 + + heap expiringHeap +} + +type entry struct { + val interface{} + expiry time.Time + generation uint64 +} + +// Get looks up an entry in the cache. +func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) { + c.mu.RLock() + defer c.mu.RUnlock() + e, ok := c.cache[key] + if !ok || c.clock.Now().After(e.expiry) { + return nil, false + } + return e.val, true +} + +// Set sets a key/value/expiry entry in the map, overwriting any previous entry +// with the same key. The entry expires at the given expiry time, but its TTL +// may be lengthened or shortened by additional calls to Set(). +func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) { + expiry := c.clock.Now().Add(ttl) + + c.mu.Lock() + defer c.mu.Unlock() + + c.generation++ + + c.cache[key] = entry{ + val: val, + expiry: expiry, + generation: c.generation, + } + + heap.Push(&c.heap, &expiringHeapEntry{ + key: key, + generation: c.generation, + expiry: expiry, + }) +} + +// Delete deletes an entry in the map. +func (c *Expiring) Delete(key interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + c.del(key, 0) +} + +// del deletes the entry for the given key. The generation argument is the +// generation of the entry that should be deleted. If the generation has been +// changed (e.g. if a set has occurred on an existing element but the old +// cleanup still runs), this is a noop. If the generation argument is 0, the +// entry's generation is ignored and the entry is deleted. +// +// del must be called under the write lock. +func (c *Expiring) del(key interface{}, generation uint64) { + e, ok := c.cache[key] + if !ok { + return + } + if generation != 0 && generation != e.generation { + return + } + delete(c.cache, key) +} + +// Len returns the number of items in the cache. +func (c *Expiring) Len() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.cache) +} + +const gcInterval = 50 * time.Millisecond + +// Run runs the GC goroutine. The goroutine exits when the passed in context is +// cancelled. +func (c *Expiring) Run(ctx context.Context) { + t := c.clock.NewTicker(gcInterval) + defer t.Stop() + for { + select { + case <-t.C(): + c.gc() + case <-ctx.Done(): + return + } + } +} + +func (c *Expiring) gc() { + now := c.clock.Now() + + c.mu.Lock() + defer c.mu.Unlock() + for { + // Return from gc if the heap is empty or the next element is not yet + // expired. + // + // heap[0] is a peek at the next element in the heap, which is not obvious + // from looking at the (*expiringHeap).Pop() implmentation below. + // heap.Pop() swaps the first entry with the last entry of the heap, then + // calls (*expiringHeap).Pop() which returns the last element. + if len(c.heap) == 0 || now.After(c.heap[0].expiry) { + return + } + cleanup := heap.Pop(&c.heap).(*expiringHeapEntry) + c.del(cleanup.key, cleanup.generation) + } +} + +type expiringHeapEntry struct { + key interface{} + generation uint64 + expiry time.Time +} + +// expiringHeap is a min-heap ordered by expiration time of it's entries. The +// expiring cache uses this as a priority queue efficiently organize entries to +// be garbage collected once they expire. +type expiringHeap []*expiringHeapEntry + +var _ heap.Interface = &expiringHeap{} + +func (cq expiringHeap) Len() int { + return len(cq) +} + +func (cq expiringHeap) Less(i, j int) bool { + return cq[i].expiry.Before(cq[j].expiry) +} + +func (cq expiringHeap) Swap(i, j int) { + cq[i], cq[j] = cq[j], cq[i] +} + +func (cq *expiringHeap) Push(c interface{}) { + *cq = append(*cq, c.(*expiringHeapEntry)) +} + +func (cq *expiringHeap) Pop() interface{} { + c := (*cq)[cq.Len()-1] + *cq = (*cq)[:cq.Len()-1] + return c +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go new file mode 100644 index 00000000000..e3114959113 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/google/uuid" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +func TestExpiringCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cache := NewExpiring() + go cache.Run(ctx) + + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } + + record1 := "bob" + record2 := "alice" + + // when empty, record is stored + cache.Set("foo", record1, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record1 { + t.Errorf("Expected %#v, true, got %#v, %v", record1, result, ok) + } + + // newer record overrides + cache.Set("foo", record2, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record2 { + t.Errorf("Expected %#v, true, got %#v, %v", record2, result, ok) + } + + // delete the current value + cache.Delete("foo") + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } +} + +func TestExpiration(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fc := &utilclock.FakeClock{} + c := NewExpiringWithClock(fc) + go c.Run(ctx) + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + fc.Step(time.Second) + if _, ok := c.Get("a"); ok { + t.Fatalf("we should not have found a key") + } + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // reset should restart the ttl + c.Set("a", "a", time.Second) + + fc.Step(750 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // Simulate a race between a reset and cleanup. Assert that del doesn't + // remove the key. + c.Set("a", "a", time.Second) + + c.mu.Lock() + e := c.cache["a"] + e.generation++ + e.expiry = e.expiry.Add(1 * time.Second) + c.cache["a"] = e + c.mu.Unlock() + + fc.Step(1 * time.Second) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } +} + +func BenchmarkExpiringCacheContention(b *testing.B) { + b.Run("evict_probablility=100%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 1) + }) + b.Run("evict_probablility=10%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.1) + }) + b.Run("evict_probablility=1%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.01) + }) +} + +func benchmarkExpiringCacheContention(b *testing.B, prob float64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + b.ResetTimer() + + b.SetParallelism(256) + b.RunParallel(func(pb *testing.PB) { + rand := rand.New(rand.NewSource(rand.Int63())) + for pb.Next() { + i := rand.Int31() + key := keys[i%numKeys] + _, ok := cache.Get(key) + if ok { + // compare lower bits of sampled i to decide whether we should evict. + if rand.Float64() < prob { + cache.Delete(key) + } + } else { + cache.Set(key, struct{}{}, 50*time.Millisecond) + } + } + }) +} + +func TestStressExpiringCache(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + var wg sync.WaitGroup + for i := 0; i < 256; i++ { + wg.Add(1) + go func() { + rand := rand.New(rand.NewSource(rand.Int63())) + for { + select { + case <-ctx.Done(): + return + default: + } + key := keys[rand.Intn(numKeys)] + if _, ok := cache.Get(key); !ok { + cache.Set(key, struct{}{}, time.Second) + } + } + }() + } + + wg.Done() +} diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index a6b511f8dd9..e2ad5df4d77 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -40,6 +40,7 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/server:all-srcs", "//staging/src/k8s.io/apiserver/pkg/storage:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD new file mode 100644 index 00000000000..6df04e38cd7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD @@ -0,0 +1,13 @@ +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/cloud-provider/go.sum b/staging/src/k8s.io/cloud-provider/go.sum index 8ad9a696c8d..425d1930297 100644 --- a/staging/src/k8s.io/cloud-provider/go.sum +++ b/staging/src/k8s.io/cloud-provider/go.sum @@ -51,6 +51,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/kubectl/go.sum b/staging/src/k8s.io/kubectl/go.sum index 012c405cb54..36aa4c26254 100644 --- a/staging/src/k8s.io/kubectl/go.sum +++ b/staging/src/k8s.io/kubectl/go.sum @@ -115,6 +115,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/node-api/go.sum b/staging/src/k8s.io/node-api/go.sum index c3966220e8d..c18523feeef 100644 --- a/staging/src/k8s.io/node-api/go.sum +++ b/staging/src/k8s.io/node-api/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/sample-controller/go.sum b/staging/src/k8s.io/sample-controller/go.sum index f816d41aa46..e93eaec773f 100644 --- a/staging/src/k8s.io/sample-controller/go.sum +++ b/staging/src/k8s.io/sample-controller/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= From 3f194d5b413daeba93063f4610b9951069eaf13c Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Wed, 6 Nov 2019 16:23:21 -0800 Subject: [PATCH 2/2] migrate token cache to cache.Expiring --- pkg/kubeapiserver/authenticator/config.go | 5 +++-- .../authenticatorfactory/delegating.go | 3 ++- .../token/cache/cache_simple.go | 17 +++++++++------- .../authentication/token/cache/cache_test.go | 17 ++++++++++++---- .../token/cache/cached_token_authenticator.go | 8 ++++---- .../cache/cached_token_authenticator_test.go | 13 ++++++++++-- .../token/webhook/webhook_v1_test.go | 20 +++++++++++++------ .../token/webhook/webhook_v1beta1_test.go | 20 +++++++++++++------ test/integration/auth/auth_test.go | 10 +++++++--- 9 files changed, 78 insertions(+), 35 deletions(-) diff --git a/pkg/kubeapiserver/authenticator/config.go b/pkg/kubeapiserver/authenticator/config.go index 0e27620d603..5a52c22630b 100644 --- a/pkg/kubeapiserver/authenticator/config.go +++ b/pkg/kubeapiserver/authenticator/config.go @@ -17,6 +17,7 @@ limitations under the License. package authenticator import ( + "context" "time" "github.com/go-openapi/spec" @@ -192,7 +193,7 @@ func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, er tokenAuth := tokenunion.New(tokenAuthenticators...) // Optionally cache authentication results if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 { - tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) + tokenAuth = tokencache.New(context.TODO(), tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) } authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ @@ -312,5 +313,5 @@ func newWebhookTokenAuthenticator(webhookConfigFile string, version string, ttl return nil, err } - return tokencache.New(webhookTokenAuthenticator, false, ttl, ttl), nil + return tokencache.New(context.TODO(), webhookTokenAuthenticator, false, ttl, ttl), nil } diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go index b9c7e2e6eee..7b384b6bd05 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go @@ -17,6 +17,7 @@ limitations under the License. package authenticatorfactory import ( + "context" "errors" "time" @@ -83,7 +84,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur if err != nil { return nil, nil, err } - cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL) + cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL) authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go index 18d5692d7a7..5012097343b 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go @@ -17,22 +17,25 @@ limitations under the License. package cache import ( + "context" "time" - lrucache "k8s.io/apimachinery/pkg/util/cache" + utilcache "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/clock" ) type simpleCache struct { - lru *lrucache.LRUExpireCache + cache *utilcache.Expiring } -func newSimpleCache(size int, clock clock.Clock) cache { - return &simpleCache{lru: lrucache.NewLRUExpireCacheWithClock(size, clock)} +func newSimpleCache(ctx context.Context, clock clock.Clock) cache { + c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)} + go c.cache.Run(ctx) + return c } func (c *simpleCache) get(key string) (*cacheRecord, bool) { - record, ok := c.lru.Get(key) + record, ok := c.cache.Get(key) if !ok { return nil, false } @@ -41,9 +44,9 @@ func (c *simpleCache) get(key string) (*cacheRecord, bool) { } func (c *simpleCache) set(key string, value *cacheRecord, ttl time.Duration) { - c.lru.Add(key, value, ttl) + c.cache.Set(key, value, ttl) } func (c *simpleCache) remove(key string) { - c.lru.Remove(key) + c.cache.Delete(key) } diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go index b7fe4cb73cd..1984c4c6b1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand" "testing" @@ -30,7 +31,9 @@ import ( ) func TestSimpleCache(t *testing.T) { - testCache(newSimpleCache(4096, clock.RealClock{}), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newSimpleCache(ctx, clock.RealClock{}), t) } // Note: the performance profile of this benchmark may not match that in the production. @@ -39,16 +42,22 @@ func TestSimpleCache(t *testing.T) { func BenchmarkCacheContentions(b *testing.B) { for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys) }) b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys) }) } } func TestStripedCache(t *testing.T) { - testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t) } func benchmarkCache(cache cache, b *testing.B, numKeys int) { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go index d94866d5baa..19d8e27e120 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go @@ -64,11 +64,11 @@ type cache interface { } // New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache. -func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { - return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) +func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { + return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) } -func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { +func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { randomCacheKey := make([]byte, 32) if _, err := rand.Read(randomCacheKey); err != nil { panic(err) // rand should never fail @@ -86,7 +86,7 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, // used. Currently we advertise support 5k nodes and 10k // namespaces; a 32k entry cache is therefore a 2x safety // margin. - cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(1024, clock) }), + cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }), hashPool: &sync.Pool{ New: func() interface{} { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index c6fb207e3a7..921f079e9c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -50,7 +50,10 @@ func TestCachedTokenAuthenticator(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil a.AuthenticateToken(context.Background(), "bad1") @@ -124,7 +127,10 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"} resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"} @@ -270,6 +276,8 @@ func (s *singleBenchmark) run(b *testing.B) { } func (s *singleBenchmark) bench(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Simulate slowness, qps limit, external service limitation, etc const maxInFlight = 40 chokepoint := make(chan struct{}, maxInFlight) @@ -277,6 +285,7 @@ func (s *singleBenchmark) bench(b *testing.B) { var lookups uint64 a := newWithClock( + ctx, authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { atomic.AddUint64(&lookups, 1) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go index 518cf706301..fa02b605b2d 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go @@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } // newV1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -203,7 +203,7 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1TLSConfig(t *testing.T) { @@ -259,7 +259,10 @@ func TestV1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -482,12 +485,14 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -554,8 +559,11 @@ func TestV1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go index 5c228b69025..998d6a782e9 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go @@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode } // newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca [] return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1beta1TLSConfig(t *testing.T) { @@ -261,7 +261,10 @@ func TestV1beta1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -484,12 +487,14 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -556,8 +561,11 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/test/integration/auth/auth_test.go b/test/integration/auth/auth_test.go index 852c2228b69..8029a05568b 100644 --- a/test/integration/auth/auth_test.go +++ b/test/integration/auth/auth_test.go @@ -70,7 +70,7 @@ func getTestTokenAuth() authenticator.Request { return group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated}) } -func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) { +func getTestWebhookTokenAuth(ctx context.Context, serverURL string) (authenticator.Request, error) { kubecfgFile, err := ioutil.TempFile("", "webhook-kubecfg") if err != nil { return nil, err @@ -90,7 +90,7 @@ func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) { if err != nil { return nil, err } - return bearertoken.New(cache.New(webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil + return bearertoken.New(cache.New(ctx, webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil } func path(resource, namespace, name string) string { @@ -1176,7 +1176,11 @@ func TestReadOnlyAuthorization(t *testing.T) { func TestWebhookTokenAuthenticator(t *testing.T) { authServer := newTestWebhookTokenAuthServer() defer authServer.Close() - authenticator, err := getTestWebhookTokenAuth(authServer.URL) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + authenticator, err := getTestWebhookTokenAuth(ctx, authServer.URL) if err != nil { t.Fatalf("error starting webhook token authenticator server: %v", err) }