From 2c60feffbee690af4632d068158e640abe10f678 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Wed, 14 Jul 2021 16:44:34 -0400 Subject: [PATCH] apiserver: add callback to get notified of object count --- .../pkg/apiserver/customresource_handler.go | 29 +-- .../pkg/cmd/server/options/options.go | 15 +- .../apiserver/pkg/registry/generic/options.go | 10 +- .../pkg/registry/generic/registry/store.go | 14 +- .../src/k8s.io/apiserver/pkg/server/config.go | 11 +- .../apiserver/pkg/server/options/etcd.go | 33 ++-- .../pkg/storage/storagebackend/config.go | 5 + .../request/object_count_tracker.go | 166 ++++++++++++++++++ .../request/object_count_tracker_test.go | 129 ++++++++++++++ 9 files changed, 369 insertions(+), 43 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 254f923ca09..068fd8e2ca7 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -76,6 +76,7 @@ import ( genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/storage/storagebackend" utilfeature "k8s.io/apiserver/pkg/util/feature" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" utilopenapi "k8s.io/apiserver/pkg/util/openapi" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/apiserver/pkg/warning" @@ -1134,23 +1135,25 @@ func (d unstructuredDefaulter) Default(in runtime.Object) { } type CRDRESTOptionsGetter struct { - StorageConfig storagebackend.Config - StoragePrefix string - EnableWatchCache bool - DefaultWatchCacheSize int - EnableGarbageCollection bool - DeleteCollectionWorkers int - CountMetricPollPeriod time.Duration + StorageConfig storagebackend.Config + StoragePrefix string + EnableWatchCache bool + DefaultWatchCacheSize int + EnableGarbageCollection bool + DeleteCollectionWorkers int + CountMetricPollPeriod time.Duration + StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker } func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { ret := generic.RESTOptions{ - StorageConfig: &t.StorageConfig, - Decorator: generic.UndecoratedStorage, - EnableGarbageCollection: t.EnableGarbageCollection, - DeleteCollectionWorkers: t.DeleteCollectionWorkers, - ResourcePrefix: resource.Group + "/" + resource.Resource, - CountMetricPollPeriod: t.CountMetricPollPeriod, + StorageConfig: &t.StorageConfig, + Decorator: generic.UndecoratedStorage, + EnableGarbageCollection: t.EnableGarbageCollection, + DeleteCollectionWorkers: t.DeleteCollectionWorkers, + ResourcePrefix: resource.Group + "/" + resource.Resource, + CountMetricPollPeriod: t.CountMetricPollPeriod, + StorageObjectCountTracker: t.StorageObjectCountTracker, } if t.EnableWatchCache { ret.Decorator = genericregistry.StorageWithCacher() diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go index 3db1027fe66..f4e98e39017 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go @@ -120,13 +120,14 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err // NewCRDRESTOptionsGetter create a RESTOptionsGetter for CustomResources. func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter { ret := apiserver.CRDRESTOptionsGetter{ - StorageConfig: etcdOptions.StorageConfig, - StoragePrefix: etcdOptions.StorageConfig.Prefix, - EnableWatchCache: etcdOptions.EnableWatchCache, - DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize, - EnableGarbageCollection: etcdOptions.EnableGarbageCollection, - DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers, - CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod, + StorageConfig: etcdOptions.StorageConfig, + StoragePrefix: etcdOptions.StorageConfig.Prefix, + EnableWatchCache: etcdOptions.EnableWatchCache, + DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize, + EnableGarbageCollection: etcdOptions.EnableGarbageCollection, + DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers, + CountMetricPollPeriod: etcdOptions.StorageConfig.CountMetricPollPeriod, + StorageObjectCountTracker: etcdOptions.StorageConfig.StorageObjectCountTracker, } ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go index 577192b626e..72b582c3b4b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/options.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/tools/cache" ) @@ -30,10 +31,11 @@ type RESTOptions struct { StorageConfig *storagebackend.Config Decorator StorageDecorator - EnableGarbageCollection bool - DeleteCollectionWorkers int - ResourcePrefix string - CountMetricPollPeriod time.Duration + EnableGarbageCollection bool + DeleteCollectionWorkers int + ResourcePrefix string + CountMetricPollPeriod time.Duration + StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker } // Implement RESTOptionsGetter so that RESTOptions can directly be used when available (i.e. tests) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 7a626855a18..ec9870a035a 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -44,6 +44,7 @@ import ( storeerr "k8s.io/apiserver/pkg/storage/errors" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/util/dryrun" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/tools/cache" "sigs.k8s.io/structured-merge-diff/v4/fieldpath" @@ -1413,7 +1414,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { e.StorageVersioner = opts.StorageConfig.EncodeVersioner if opts.CountMetricPollPeriod > 0 { - stopFunc := e.startObservingCount(opts.CountMetricPollPeriod) + stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker) previousDestroy := e.DestroyFunc e.DestroyFunc = func() { stopFunc() @@ -1428,7 +1429,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { } // startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection. -func (e *Store) startObservingCount(period time.Duration) func() { +func (e *Store) startObservingCount(period time.Duration, objectCountTracker flowcontrolrequest.StorageObjectCountTracker) func() { prefix := e.KeyRootFunc(genericapirequest.NewContext()) resourceName := e.DefaultQualifiedResource.String() klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "/"+prefix) @@ -1437,9 +1438,12 @@ func (e *Store) startObservingCount(period time.Duration) func() { count, err := e.Storage.Count(prefix) if err != nil { klog.V(5).InfoS("Failed to update storage count metric", "err", err) - metrics.UpdateObjectCount(resourceName, -1) - } else { - metrics.UpdateObjectCount(resourceName, count) + count = -1 + } + + metrics.UpdateObjectCount(resourceName, count) + if objectCountTracker != nil { + objectCountTracker.Set(resourceName, count) } }, period, resourceCountPollPeriodJitter, true, stopCh) return func() { close(stopCh) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index b9a6c54bdfe..95c96f95aec 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -227,6 +227,10 @@ type Config struct { // it's intentionally marked private as it should never be overridden. lifecycleSignals lifecycleSignals + // StorageObjectCountTracker is used to keep track of the total number of objects + // in the storage per resource, so we can estimate width of incoming requests. + StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker + //=========================================================================== // values below here are targets for removal //=========================================================================== @@ -312,6 +316,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config { if feature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { id = "kube-apiserver-" + uuid.New().String() } + lifecycleSignals := newLifecycleSignals() + return &Config{ Serializer: codecs, BuildHandlerChainFunc: DefaultBuildHandlerChain, @@ -349,8 +355,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources - LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), - lifecycleSignals: newLifecycleSignals(), + LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + lifecycleSignals: lifecycleSignals, + StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(lifecycleSignals.ShutdownInitiated.Signaled()), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index d8b45b8198f..faab0071924 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -206,6 +206,9 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error { } } + // use the StorageObjectCountTracker interface instance from server.Config + s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker + c.RESTOptionsGetter = &SimpleRestOptionsFactory{ Options: *s, TransformerOverrides: transformerOverrides, @@ -217,6 +220,10 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac if err := s.addEtcdHealthEndpoint(c); err != nil { return err } + + // use the StorageObjectCountTracker interface instance from server.Config + s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker + c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil } @@ -248,12 +255,13 @@ type SimpleRestOptionsFactory struct { func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { ret := generic.RESTOptions{ - StorageConfig: &f.Options.StorageConfig, - Decorator: generic.UndecoratedStorage, - EnableGarbageCollection: f.Options.EnableGarbageCollection, - DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, - ResourcePrefix: resource.Group + "/" + resource.Resource, - CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, + StorageConfig: &f.Options.StorageConfig, + Decorator: generic.UndecoratedStorage, + EnableGarbageCollection: f.Options.EnableGarbageCollection, + DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, + ResourcePrefix: resource.Group + "/" + resource.Resource, + CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, + StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, } if f.TransformerOverrides != nil { if transformer, ok := f.TransformerOverrides[resource]; ok { @@ -290,12 +298,13 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR } ret := generic.RESTOptions{ - StorageConfig: storageConfig, - Decorator: generic.UndecoratedStorage, - DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, - EnableGarbageCollection: f.Options.EnableGarbageCollection, - ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), - CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, + StorageConfig: storageConfig, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, + EnableGarbageCollection: f.Options.EnableGarbageCollection, + ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), + CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, + StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 45cfd2ea185..df08e40776a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -24,6 +24,7 @@ import ( "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/value" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" ) const ( @@ -84,6 +85,10 @@ type Config struct { HealthcheckTimeout time.Duration LeaseManagerConfig etcd3.LeaseManagerConfig + + // StorageObjectCountTracker is used to keep track of the total + // number of objects in the storage per resource. + StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go new file mode 100644 index 00000000000..f068500f657 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker.go @@ -0,0 +1,166 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "errors" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +const ( + // type deletion (it applies mostly to CRD) is not a very frequent + // operation so we can afford to prune the cache at a large interval. + // at the same time, we also want to make sure that the scalability + // tests hit this code path. + pruneInterval = 1 * time.Hour + + // the storage layer polls for object count at every 1m interval, we will allow + // up to 2-3 transient failures to get the latest count for a given resource. + staleTolerationThreshold = 3 * time.Minute +) + +var ( + // ObjectCountNotFoundErr is returned when the object count for + // a given resource is not being tracked. + ObjectCountNotFoundErr = errors.New("object count not found for the given resource") + + // ObjectCountStaleErr is returned when the object count for a + // given resource has gone stale due to transient failures. + ObjectCountStaleErr = errors.New("object count has gone stale for the given resource") +) + +// StorageObjectCountTracker is an interface that is used to keep track of +// of the total number of objects for each resource. +// {group}.{resource} is used as the key name to update and retrieve +// the total number of objects for a given resource. +type StorageObjectCountTracker interface { + // Set is invoked to update the current number of total + // objects for the given resource + Set(string, int64) + + // Get returns the total number of objects for the given resource. + // The following errors are returned: + // - if the count has gone stale for a given resource due to transient + // failures ObjectCountStaleErr is returned. + // - if the given resource is not being tracked then + // ObjectCountNotFoundErr is returned. + Get(string) (int64, error) +} + +// NewStorageObjectCountTracker returns an instance of +// StorageObjectCountTracker interface that can be used to +// keep track of the total number of objects for each resource. +func NewStorageObjectCountTracker(stopCh <-chan struct{}) StorageObjectCountTracker { + tracker := &objectCountTracker{ + clock: &clock.RealClock{}, + counts: map[string]*timestampedCount{}, + } + go func() { + wait.PollUntil( + pruneInterval, + func() (bool, error) { + // always prune at every pruneInterval + return false, tracker.prune(pruneInterval) + }, stopCh) + klog.InfoS("StorageObjectCountTracker pruner is exiting") + }() + + return tracker +} + +// timestampedCount stores the count of a given resource with a last updated +// timestamp so we can prune it after it goes stale for certain threshold. +type timestampedCount struct { + count int64 + lastUpdatedAt time.Time +} + +// objectCountTracker implements StorageObjectCountTracker with +// reader/writer mutual exclusion lock. +type objectCountTracker struct { + clock clock.PassiveClock + + lock sync.RWMutex + counts map[string]*timestampedCount +} + +func (t *objectCountTracker) Set(groupResource string, count int64) { + if count <= -1 { + // a value of -1 indicates that the 'Count' call failed to contact + // the storage layer, in most cases this error can be transient. + // we will continue to work with the count that is in the cache + // up to a certain threshold defined by staleTolerationThreshold. + // in case this becomes a non transient error then the count for + // the given resource will will eventually be removed from + // the cache by the pruner. + return + } + + now := t.clock.Now() + + // lock for writing + t.lock.Lock() + defer t.lock.Unlock() + + if item, ok := t.counts[groupResource]; ok { + item.count = count + item.lastUpdatedAt = now + return + } + + t.counts[groupResource] = ×tampedCount{ + count: count, + lastUpdatedAt: now, + } +} + +func (t *objectCountTracker) Get(groupResource string) (int64, error) { + staleThreshold := t.clock.Now().Add(-staleTolerationThreshold) + + t.lock.RLock() + defer t.lock.RUnlock() + + if item, ok := t.counts[groupResource]; ok { + if item.lastUpdatedAt.Before(staleThreshold) { + return item.count, ObjectCountStaleErr + } + return item.count, nil + } + return 0, ObjectCountNotFoundErr +} + +func (t *objectCountTracker) prune(threshold time.Duration) error { + oldestLastUpdatedAtAllowed := t.clock.Now().Add(-threshold) + + // lock for writing + t.lock.Lock() + defer t.lock.Unlock() + + for groupResource, count := range t.counts { + if count.lastUpdatedAt.After(oldestLastUpdatedAtAllowed) { + continue + } + delete(t.counts, groupResource) + } + + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go new file mode 100644 index 00000000000..0e86ce35bb6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/object_count_tracker_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +import ( + "reflect" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/util/clock" +) + +func TestStorageObjectCountTracker(t *testing.T) { + tests := []struct { + name string + lastUpdated time.Duration + count int64 + errExpected error + countExpected int64 + }{ + { + name: "object count not tracked for given resource", + count: -2, + errExpected: ObjectCountNotFoundErr, + }, + { + name: "transient failure", + count: -1, + errExpected: ObjectCountNotFoundErr, + }, + { + name: "object count is zero", + count: 0, + countExpected: 0, + errExpected: nil, + }, + { + name: "object count is more than zero", + count: 799, + countExpected: 799, + errExpected: nil, + }, + { + name: "object count stale", + count: 799, + countExpected: 799, + lastUpdated: staleTolerationThreshold + time.Millisecond, + errExpected: ObjectCountStaleErr, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClock := &clock.FakePassiveClock{} + tracker := &objectCountTracker{ + clock: fakeClock, + counts: map[string]*timestampedCount{}, + } + + key := "foo.bar.resource" + now := time.Now() + fakeClock.SetTime(now.Add(-test.lastUpdated)) + tracker.Set(key, test.count) + + fakeClock.SetTime(now) + countGot, err := tracker.Get(key) + if test.errExpected != err { + t.Errorf("Expected error: %v, but got: %v", test.errExpected, err) + } + if test.countExpected != countGot { + t.Errorf("Expected count: %d, but got: %d", test.countExpected, countGot) + } + if test.count <= -1 && len(tracker.counts) > 0 { + t.Errorf("Expected the cache to be empty, but got: %d", len(tracker.counts)) + } + }) + } +} + +func TestStorageObjectCountTrackerWithPrune(t *testing.T) { + fakeClock := &clock.FakePassiveClock{} + tracker := &objectCountTracker{ + clock: fakeClock, + counts: map[string]*timestampedCount{}, + } + + now := time.Now() + fakeClock.SetTime(now.Add(-61 * time.Minute)) + tracker.Set("k1", 61) + fakeClock.SetTime(now.Add(-60 * time.Minute)) + tracker.Set("k2", 60) + // we are going to prune keys that are stale for >= 1h + // so the above keys are expected to be pruned and the + // key below should not be pruned. + mostRecent := now.Add(-59 * time.Minute) + fakeClock.SetTime(mostRecent) + tracker.Set("k3", 59) + expected := map[string]*timestampedCount{ + "k3": { + count: 59, + lastUpdatedAt: mostRecent, + }, + } + + fakeClock.SetTime(now) + if err := tracker.prune(time.Hour); err != nil { + t.Fatalf("Expected no error, but got: %v", err) + } + + // we expect only one entry in the map, so DeepEqual should work. + if !reflect.DeepEqual(expected, tracker.counts) { + t.Errorf("Expected prune to remove stale entries - diff: %s", cmp.Diff(expected, tracker.counts)) + } +}