diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD index 85a1f3d95f1..ff280d456ab 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/BUILD @@ -8,20 +8,30 @@ load( go_test( name = "go_default_test", - srcs = ["lruexpirecache_test.go"], + srcs = [ + "expiring_test.go", + "lruexpirecache_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", + "//vendor/github.com/google/uuid:go_default_library", ], ) go_library( name = "go_default_library", - srcs = ["lruexpirecache.go"], + srcs = [ + "expiring.go", + "lruexpirecache.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/util/cache", importpath = "k8s.io/apimachinery/pkg/util/cache", - deps = ["//vendor/github.com/hashicorp/golang-lru:go_default_library"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/github.com/hashicorp/golang-lru:go_default_library", + ], ) filegroup( diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go new file mode 100644 index 00000000000..e9ebfc877da --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go @@ -0,0 +1,208 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +// NewExpiring returns an initialized expiring cache. Users must call +// (*Expiring).Run() to begin the GC goroutine. +func NewExpiring() *Expiring { + return NewExpiringWithClock(utilclock.RealClock{}) +} + +// NewExpiringWithClock is like NewExpiring but allows passing in a custom +// clock for testing. +func NewExpiringWithClock(clock utilclock.Clock) *Expiring { + return &Expiring{ + clock: clock, + cache: make(map[interface{}]entry), + } +} + +// Expiring is a map whose entries expire after a per-entry timeout. +type Expiring struct { + clock utilclock.Clock + + // mu protects the below fields + mu sync.RWMutex + // cache is the internal map that backs the cache. + cache map[interface{}]entry + // generation is used as a cheap resource version for cache entries. Cleanups + // are scheduled with a key and generation. When the cleanup runs, it first + // compares its generation with the current generation of the entry. It + // deletes the entry iff the generation matches. This prevents cleanups + // scheduled for earlier versions of an entry from deleting later versions of + // an entry when Set() is called multiple times with the same key. + // + // The integer value of the generation of an entry is meaningless. + generation uint64 + + heap expiringHeap +} + +type entry struct { + val interface{} + expiry time.Time + generation uint64 +} + +// Get looks up an entry in the cache. +func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) { + c.mu.RLock() + defer c.mu.RUnlock() + e, ok := c.cache[key] + if !ok || c.clock.Now().After(e.expiry) { + return nil, false + } + return e.val, true +} + +// Set sets a key/value/expiry entry in the map, overwriting any previous entry +// with the same key. The entry expires at the given expiry time, but its TTL +// may be lengthened or shortened by additional calls to Set(). +func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) { + expiry := c.clock.Now().Add(ttl) + + c.mu.Lock() + defer c.mu.Unlock() + + c.generation++ + + c.cache[key] = entry{ + val: val, + expiry: expiry, + generation: c.generation, + } + + heap.Push(&c.heap, &expiringHeapEntry{ + key: key, + generation: c.generation, + expiry: expiry, + }) +} + +// Delete deletes an entry in the map. +func (c *Expiring) Delete(key interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + c.del(key, 0) +} + +// del deletes the entry for the given key. The generation argument is the +// generation of the entry that should be deleted. If the generation has been +// changed (e.g. if a set has occurred on an existing element but the old +// cleanup still runs), this is a noop. If the generation argument is 0, the +// entry's generation is ignored and the entry is deleted. +// +// del must be called under the write lock. +func (c *Expiring) del(key interface{}, generation uint64) { + e, ok := c.cache[key] + if !ok { + return + } + if generation != 0 && generation != e.generation { + return + } + delete(c.cache, key) +} + +// Len returns the number of items in the cache. +func (c *Expiring) Len() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.cache) +} + +const gcInterval = 50 * time.Millisecond + +// Run runs the GC goroutine. The goroutine exits when the passed in context is +// cancelled. +func (c *Expiring) Run(ctx context.Context) { + t := c.clock.NewTicker(gcInterval) + defer t.Stop() + for { + select { + case <-t.C(): + c.gc() + case <-ctx.Done(): + return + } + } +} + +func (c *Expiring) gc() { + now := c.clock.Now() + + c.mu.Lock() + defer c.mu.Unlock() + for { + // Return from gc if the heap is empty or the next element is not yet + // expired. + // + // heap[0] is a peek at the next element in the heap, which is not obvious + // from looking at the (*expiringHeap).Pop() implmentation below. + // heap.Pop() swaps the first entry with the last entry of the heap, then + // calls (*expiringHeap).Pop() which returns the last element. + if len(c.heap) == 0 || now.After(c.heap[0].expiry) { + return + } + cleanup := heap.Pop(&c.heap).(*expiringHeapEntry) + c.del(cleanup.key, cleanup.generation) + } +} + +type expiringHeapEntry struct { + key interface{} + generation uint64 + expiry time.Time +} + +// expiringHeap is a min-heap ordered by expiration time of it's entries. The +// expiring cache uses this as a priority queue efficiently organize entries to +// be garbage collected once they expire. +type expiringHeap []*expiringHeapEntry + +var _ heap.Interface = &expiringHeap{} + +func (cq expiringHeap) Len() int { + return len(cq) +} + +func (cq expiringHeap) Less(i, j int) bool { + return cq[i].expiry.Before(cq[j].expiry) +} + +func (cq expiringHeap) Swap(i, j int) { + cq[i], cq[j] = cq[j], cq[i] +} + +func (cq *expiringHeap) Push(c interface{}) { + *cq = append(*cq, c.(*expiringHeapEntry)) +} + +func (cq *expiringHeap) Pop() interface{} { + c := (*cq)[cq.Len()-1] + *cq = (*cq)[:cq.Len()-1] + return c +} diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go new file mode 100644 index 00000000000..e3114959113 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go @@ -0,0 +1,197 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/google/uuid" + + utilclock "k8s.io/apimachinery/pkg/util/clock" +) + +func TestExpiringCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cache := NewExpiring() + go cache.Run(ctx) + + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } + + record1 := "bob" + record2 := "alice" + + // when empty, record is stored + cache.Set("foo", record1, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record1 { + t.Errorf("Expected %#v, true, got %#v, %v", record1, result, ok) + } + + // newer record overrides + cache.Set("foo", record2, time.Hour) + if result, ok := cache.Get("foo"); !ok || result != record2 { + t.Errorf("Expected %#v, true, got %#v, %v", record2, result, ok) + } + + // delete the current value + cache.Delete("foo") + if result, ok := cache.Get("foo"); ok || result != nil { + t.Errorf("Expected null, false, got %#v, %v", result, ok) + } +} + +func TestExpiration(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fc := &utilclock.FakeClock{} + c := NewExpiringWithClock(fc) + go c.Run(ctx) + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + fc.Step(time.Second) + if _, ok := c.Get("a"); ok { + t.Fatalf("we should not have found a key") + } + + c.Set("a", "a", time.Second) + + fc.Step(500 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // reset should restart the ttl + c.Set("a", "a", time.Second) + + fc.Step(750 * time.Millisecond) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } + + // Simulate a race between a reset and cleanup. Assert that del doesn't + // remove the key. + c.Set("a", "a", time.Second) + + c.mu.Lock() + e := c.cache["a"] + e.generation++ + e.expiry = e.expiry.Add(1 * time.Second) + c.cache["a"] = e + c.mu.Unlock() + + fc.Step(1 * time.Second) + if _, ok := c.Get("a"); !ok { + t.Fatalf("we should have found a key") + } +} + +func BenchmarkExpiringCacheContention(b *testing.B) { + b.Run("evict_probablility=100%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 1) + }) + b.Run("evict_probablility=10%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.1) + }) + b.Run("evict_probablility=1%", func(b *testing.B) { + benchmarkExpiringCacheContention(b, 0.01) + }) +} + +func benchmarkExpiringCacheContention(b *testing.B, prob float64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + b.ResetTimer() + + b.SetParallelism(256) + b.RunParallel(func(pb *testing.PB) { + rand := rand.New(rand.NewSource(rand.Int63())) + for pb.Next() { + i := rand.Int31() + key := keys[i%numKeys] + _, ok := cache.Get(key) + if ok { + // compare lower bits of sampled i to decide whether we should evict. + if rand.Float64() < prob { + cache.Delete(key) + } + } else { + cache.Set(key, struct{}{}, 50*time.Millisecond) + } + } + }) +} + +func TestStressExpiringCache(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + const numKeys = 1 << 16 + cache := NewExpiring() + go cache.Run(ctx) + + keys := []string{} + for i := 0; i < numKeys; i++ { + key := uuid.New().String() + keys = append(keys, key) + } + + var wg sync.WaitGroup + for i := 0; i < 256; i++ { + wg.Add(1) + go func() { + rand := rand.New(rand.NewSource(rand.Int63())) + for { + select { + case <-ctx.Done(): + return + default: + } + key := keys[rand.Intn(numKeys)] + if _, ok := cache.Get(key); !ok { + cache.Set(key, struct{}{}, time.Second) + } + } + }() + } + + wg.Done() +} diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index a6b511f8dd9..e2ad5df4d77 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -40,6 +40,7 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/server:all-srcs", "//staging/src/k8s.io/apiserver/pkg/storage:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD new file mode 100644 index 00000000000..6df04e38cd7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/cache/BUILD @@ -0,0 +1,13 @@ +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/cloud-provider/go.sum b/staging/src/k8s.io/cloud-provider/go.sum index 8ad9a696c8d..425d1930297 100644 --- a/staging/src/k8s.io/cloud-provider/go.sum +++ b/staging/src/k8s.io/cloud-provider/go.sum @@ -51,6 +51,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/kubectl/go.sum b/staging/src/k8s.io/kubectl/go.sum index 012c405cb54..36aa4c26254 100644 --- a/staging/src/k8s.io/kubectl/go.sum +++ b/staging/src/k8s.io/kubectl/go.sum @@ -115,6 +115,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/node-api/go.sum b/staging/src/k8s.io/node-api/go.sum index c3966220e8d..c18523feeef 100644 --- a/staging/src/k8s.io/node-api/go.sum +++ b/staging/src/k8s.io/node-api/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= diff --git a/staging/src/k8s.io/sample-controller/go.sum b/staging/src/k8s.io/sample-controller/go.sum index f816d41aa46..e93eaec773f 100644 --- a/staging/src/k8s.io/sample-controller/go.sum +++ b/staging/src/k8s.io/sample-controller/go.sum @@ -71,6 +71,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=