From d16dde36a3edf5cdb89c5d5b56d4e3c9af849c1c Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Fri, 15 Nov 2019 17:04:28 -0800 Subject: [PATCH] inline GC in expiring cache This allows us to drop the background goroutine with negligable difference in performance. --- pkg/kubeapiserver/authenticator/config.go | 5 +-- .../apimachinery/pkg/util/cache/expiring.go | 38 ++++++------------- .../pkg/util/cache/expiring_test.go | 15 -------- .../authenticatorfactory/delegating.go | 3 +- .../token/cache/cache_simple.go | 7 +--- .../authentication/token/cache/cache_test.go | 17 ++------- .../token/cache/cached_token_authenticator.go | 8 ++-- .../cache/cached_token_authenticator_test.go | 13 +------ .../token/webhook/webhook_v1_test.go | 20 +++------- .../token/webhook/webhook_v1beta1_test.go | 20 +++------- test/integration/auth/auth_test.go | 10 ++--- 11 files changed, 41 insertions(+), 115 deletions(-) diff --git a/pkg/kubeapiserver/authenticator/config.go b/pkg/kubeapiserver/authenticator/config.go index 5a52c22630b..0e27620d603 100644 --- a/pkg/kubeapiserver/authenticator/config.go +++ b/pkg/kubeapiserver/authenticator/config.go @@ -17,7 +17,6 @@ limitations under the License. package authenticator import ( - "context" "time" "github.com/go-openapi/spec" @@ -193,7 +192,7 @@ func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, er tokenAuth := tokenunion.New(tokenAuthenticators...) // Optionally cache authentication results if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 { - tokenAuth = tokencache.New(context.TODO(), tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) + tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) } authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ @@ -313,5 +312,5 @@ func newWebhookTokenAuthenticator(webhookConfigFile string, version string, ttl return nil, err } - return tokencache.New(context.TODO(), webhookTokenAuthenticator, false, ttl, ttl), nil + return tokencache.New(webhookTokenAuthenticator, false, ttl, ttl), nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go index e9ebfc877da..f84cf636b27 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go @@ -18,15 +18,13 @@ package cache import ( "container/heap" - "context" "sync" "time" utilclock "k8s.io/apimachinery/pkg/util/clock" ) -// NewExpiring returns an initialized expiring cache. Users must call -// (*Expiring).Run() to begin the GC goroutine. +// NewExpiring returns an initialized expiring cache. func NewExpiring() *Expiring { return NewExpiringWithClock(utilclock.RealClock{}) } @@ -80,9 +78,13 @@ func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) { // Set sets a key/value/expiry entry in the map, overwriting any previous entry // with the same key. The entry expires at the given expiry time, but its TTL -// may be lengthened or shortened by additional calls to Set(). +// may be lengthened or shortened by additional calls to Set(). Garbage +// collection of expired entries occurs during calls to Set(), however calls to +// Get() will not return expired entries that have not yet been garbage +// collected. func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) { - expiry := c.clock.Now().Add(ttl) + now := c.clock.Now() + expiry := now.Add(ttl) c.mu.Lock() defer c.mu.Unlock() @@ -95,6 +97,9 @@ func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) { generation: c.generation, } + // Run GC inline before pushing the new entry. + c.gc(now) + heap.Push(&c.heap, &expiringHeapEntry{ key: key, generation: c.generation, @@ -134,28 +139,7 @@ func (c *Expiring) Len() int { return len(c.cache) } -const gcInterval = 50 * time.Millisecond - -// Run runs the GC goroutine. The goroutine exits when the passed in context is -// cancelled. -func (c *Expiring) Run(ctx context.Context) { - t := c.clock.NewTicker(gcInterval) - defer t.Stop() - for { - select { - case <-t.C(): - c.gc() - case <-ctx.Done(): - return - } - } -} - -func (c *Expiring) gc() { - now := c.clock.Now() - - c.mu.Lock() - defer c.mu.Unlock() +func (c *Expiring) gc(now time.Time) { for { // Return from gc if the heap is empty or the next element is not yet // expired. diff --git a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go index e3114959113..7b834b33f40 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring_test.go @@ -29,11 +29,7 @@ import ( ) func TestExpiringCache(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cache := NewExpiring() - go cache.Run(ctx) if result, ok := cache.Get("foo"); ok || result != nil { t.Errorf("Expected null, false, got %#v, %v", result, ok) @@ -62,12 +58,8 @@ func TestExpiringCache(t *testing.T) { } func TestExpiration(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - fc := &utilclock.FakeClock{} c := NewExpiringWithClock(fc) - go c.Run(ctx) c.Set("a", "a", time.Second) @@ -100,12 +92,10 @@ func TestExpiration(t *testing.T) { // remove the key. c.Set("a", "a", time.Second) - c.mu.Lock() e := c.cache["a"] e.generation++ e.expiry = e.expiry.Add(1 * time.Second) c.cache["a"] = e - c.mu.Unlock() fc.Step(1 * time.Second) if _, ok := c.Get("a"); !ok { @@ -126,12 +116,8 @@ func BenchmarkExpiringCacheContention(b *testing.B) { } func benchmarkExpiringCacheContention(b *testing.B, prob float64) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - const numKeys = 1 << 16 cache := NewExpiring() - go cache.Run(ctx) keys := []string{} for i := 0; i < numKeys; i++ { @@ -166,7 +152,6 @@ func TestStressExpiringCache(t *testing.T) { const numKeys = 1 << 16 cache := NewExpiring() - go cache.Run(ctx) keys := []string{} for i := 0; i < numKeys; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go index 7b384b6bd05..b9c7e2e6eee 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory/delegating.go @@ -17,7 +17,6 @@ limitations under the License. package authenticatorfactory import ( - "context" "errors" "time" @@ -84,7 +83,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur if err != nil { return nil, nil, err } - cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL) + cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL) authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go index 5012097343b..8e0520af169 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_simple.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "time" utilcache "k8s.io/apimachinery/pkg/util/cache" @@ -28,10 +27,8 @@ type simpleCache struct { cache *utilcache.Expiring } -func newSimpleCache(ctx context.Context, clock clock.Clock) cache { - c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)} - go c.cache.Run(ctx) - return c +func newSimpleCache(clock clock.Clock) cache { + return &simpleCache{cache: utilcache.NewExpiringWithClock(clock)} } func (c *simpleCache) get(key string) (*cacheRecord, bool) { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go index 1984c4c6b1f..bd0457ac6ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cache_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "context" "fmt" "math/rand" "testing" @@ -31,9 +30,7 @@ import ( ) func TestSimpleCache(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - testCache(newSimpleCache(ctx, clock.RealClock{}), t) + testCache(newSimpleCache(clock.RealClock{}), t) } // Note: the performance profile of this benchmark may not match that in the production. @@ -42,22 +39,16 @@ func TestSimpleCache(t *testing.T) { func BenchmarkCacheContentions(b *testing.B) { for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys) + benchmarkCache(newSimpleCache(clock.RealClock{}), b, numKeys) }) b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys) + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(clock.RealClock{}) }), b, numKeys) }) } } func TestStripedCache(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t) + testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(clock.RealClock{}) }), t) } func benchmarkCache(cache cache, b *testing.B, numKeys int) { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go index 19d8e27e120..ef0a8c87215 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go @@ -64,11 +64,11 @@ type cache interface { } // New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache. -func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { - return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) +func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { + return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) } -func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { +func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { randomCacheKey := make([]byte, 32) if _, err := rand.Read(randomCacheKey); err != nil { panic(err) // rand should never fail @@ -86,7 +86,7 @@ func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheE // used. Currently we advertise support 5k nodes and 10k // namespaces; a 32k entry cache is therefore a 2x safety // margin. - cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }), + cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(clock) }), hashPool: &sync.Pool{ New: func() interface{} { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index 921f079e9c7..c6fb207e3a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -50,10 +50,7 @@ func TestCachedTokenAuthenticator(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) + a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil a.AuthenticateToken(context.Background(), "bad1") @@ -127,10 +124,7 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) + a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"} resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"} @@ -276,8 +270,6 @@ func (s *singleBenchmark) run(b *testing.B) { } func (s *singleBenchmark) bench(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Simulate slowness, qps limit, external service limitation, etc const maxInFlight = 40 chokepoint := make(chan struct{}, maxInFlight) @@ -285,7 +277,6 @@ func (s *singleBenchmark) bench(b *testing.B) { var lookups uint64 a := newWithClock( - ctx, authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { atomic.AddUint64(&lookups, 1) diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go index fa02b605b2d..518cf706301 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go @@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } // newV1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -203,7 +203,7 @@ func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, return nil, err } - return cache.New(ctx, authn, false, cacheTime, cacheTime), nil + return cache.New(authn, false, cacheTime, cacheTime), nil } func TestV1TLSConfig(t *testing.T) { @@ -259,10 +259,7 @@ func TestV1TLSConfig(t *testing.T) { } defer server.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -485,14 +482,12 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } + ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -559,11 +554,8 @@ func TestV1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go index 998d6a782e9..5c228b69025 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go @@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode } // newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientC return nil, err } - return cache.New(ctx, authn, false, cacheTime, cacheTime), nil + return cache.New(authn, false, cacheTime, cacheTime), nil } func TestV1beta1TLSConfig(t *testing.T) { @@ -261,10 +261,7 @@ func TestV1beta1TLSConfig(t *testing.T) { } defer server.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -487,14 +484,12 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } + ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -561,11 +556,8 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/test/integration/auth/auth_test.go b/test/integration/auth/auth_test.go index 8029a05568b..852c2228b69 100644 --- a/test/integration/auth/auth_test.go +++ b/test/integration/auth/auth_test.go @@ -70,7 +70,7 @@ func getTestTokenAuth() authenticator.Request { return group.NewGroupAdder(bearertoken.New(tokenAuthenticator), []string{user.AllAuthenticated}) } -func getTestWebhookTokenAuth(ctx context.Context, serverURL string) (authenticator.Request, error) { +func getTestWebhookTokenAuth(serverURL string) (authenticator.Request, error) { kubecfgFile, err := ioutil.TempFile("", "webhook-kubecfg") if err != nil { return nil, err @@ -90,7 +90,7 @@ func getTestWebhookTokenAuth(ctx context.Context, serverURL string) (authenticat if err != nil { return nil, err } - return bearertoken.New(cache.New(ctx, webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil + return bearertoken.New(cache.New(webhookTokenAuth, false, 2*time.Minute, 2*time.Minute)), nil } func path(resource, namespace, name string) string { @@ -1176,11 +1176,7 @@ func TestReadOnlyAuthorization(t *testing.T) { func TestWebhookTokenAuthenticator(t *testing.T) { authServer := newTestWebhookTokenAuthServer() defer authServer.Close() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - authenticator, err := getTestWebhookTokenAuth(ctx, authServer.URL) + authenticator, err := getTestWebhookTokenAuth(authServer.URL) if err != nil { t.Fatalf("error starting webhook token authenticator server: %v", err) }