Kubelet Credential Provider

Improve concurrency and cache for credential provider

Removed lock from "Provide" as it can be called in parallel
from image puller. To avoid execing for the same image concurrently
wrapped exec in singleflight.

Purging the cache for expried data with 15mins interval only when
a request for credential is made.

KEP:2133

Signed-off-by: Aditi Sharma <adi.sky17@gmail.com>
This commit is contained in:
Aditi Sharma 2021-05-18 13:37:36 +00:00
parent 7b24c7e4a7
commit def93317b4
4 changed files with 285 additions and 34 deletions

1
go.mod
View File

@ -90,6 +90,7 @@ require (
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba

View File

@ -28,10 +28,13 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
@ -43,7 +46,8 @@ import (
)
const (
globalCacheKey = "global"
globalCacheKey = "global"
cachePurgeInterval = time.Minute * 15
)
var (
@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
}
clock := clock.RealClock{}
return &pluginProvider{
clock: clock,
matchImages: provider.MatchImages,
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
defaultCacheDuration: provider.DefaultCacheDuration.Duration,
lastCachePurge: clock.Now(),
plugin: &execPlugin{
name: provider.Name,
apiVersion: provider.APIVersion,
@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
// pluginProvider is the plugin-based implementation of the DockerConfigProvider interface.
type pluginProvider struct {
clock clock.Clock
sync.Mutex
group singleflight.Group
// matchImages defines the matching image URLs this plugin should operate against.
// The plugin provider will not return any credentials for images that do not match
// against this list of match URLs.
@ -149,6 +161,9 @@ type pluginProvider struct {
// plugin is the exec implementation of the credential providing plugin.
plugin Plugin
// lastCachePurge is the last time cache is cleaned for expired entries.
lastCachePurge time.Time
}
// cacheEntry is the cache object that will be stored in cache.Store.
@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) {
}
// cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp.
type cacheExpirationPolicy struct{}
type cacheExpirationPolicy struct {
clock clock.Clock
}
// IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the
// cache duration returned from the credential provider plugin response.
func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return time.Now().After(entry.Obj.(*cacheEntry).expiresAt)
return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}
// Provide returns a credentialprovider.DockerConfig based on the credentials returned
@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return credentialprovider.DockerConfig{}
}
p.Lock()
defer p.Unlock()
cachedConfig, found, err := p.getCachedCredentials(image)
if err != nil {
klog.Errorf("Failed to get cached docker config: %v", err)
@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return cachedConfig
}
response, err := p.plugin.ExecPlugin(context.Background(), image)
// ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request.
// The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache
// on per registry/global basis then exec will be called for all requests if requests are made concurrently.
// foo.bar.registry
// foo.bar.registry/image1
// foo.bar.registry/image2
res, err, _ := p.group.Do(image, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image)
})
if err != nil {
klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
return credentialprovider.DockerConfig{}
}
response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
if !ok {
klog.Errorf("Invalid response type returned by external credential provider")
return credentialprovider.DockerConfig{}
}
var cacheKey string
switch cacheKeyType := response.CacheKeyType; cacheKeyType {
case credentialproviderapi.ImagePluginCacheKeyType:
@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
if p.defaultCacheDuration == 0 {
return dockerConfig
}
expiresAt = time.Now().Add(p.defaultCacheDuration)
expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
} else {
expiresAt = time.Now().Add(response.CacheDuration.Duration)
expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
}
cachedEntry := &cacheEntry{
@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool {
// getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
p.Lock()
if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
// NewExpirationCache purges expired entries when List() is called
// The expired entry in the cache is removed only when Get or List called on it.
// List() is called on some interval to remove those expired entries on which Get is never called.
_ = p.cache.List()
p.lastCachePurge = p.clock.Now()
}
p.Unlock()
obj, found, err := p.cache.GetByKey(image)
if err != nil {
return nil, false, err
@ -325,6 +363,8 @@ type execPlugin struct {
// The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
// return CredentialProviderResponse via stdout.
func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)
authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
data, err := e.encodeRequest(authRequest)
if err != nil {
@ -361,7 +401,6 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}
data = stdout.Bytes()
// check that the response apiVersion matches what is expected
gvk, err := json.DefaultMetaFactory.Interpret(data)
if err != nil {
@ -369,10 +408,10 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}
if gvk.GroupVersion().String() != e.apiVersion {
return nil, errors.New("apiVersion from credential plugin response did not match")
return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
}
response, err := e.decodeResponse(stdout.Bytes())
response, err := e.decodeResponse(data)
if err != nil {
// err is explicitly not wrapped since it may contain credentials in the response.
return nil, errors.New("error decoding credential provider plugin response from stdout")

View File

@ -18,12 +18,17 @@ package plugin
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/tools/cache"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1"
@ -48,6 +53,7 @@ func (f *fakeExecPlugin) ExecPlugin(ctx context.Context, image string) (*credent
}
func Test_Provide(t *testing.T) {
tclock := clock.RealClock{}
testcases := []struct {
name string
pluginProvider *pluginProvider
@ -57,8 +63,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Registry cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -80,8 +88,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Image cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"test.registry.io/foo/bar"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io/foo/bar"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -103,8 +113,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Global cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -126,8 +138,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Registry cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"*.registry.io:8080"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io:8080"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -149,8 +163,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Image cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"*.*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -172,8 +188,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Global cache key",
pluginProvider: &pluginProvider{
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -195,7 +213,9 @@ func Test_Provide(t *testing.T) {
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
dockerconfig := testcase.pluginProvider.Provide(testcase.image)
if !reflect.DeepEqual(dockerconfig, testcase.dockerconfig) {
t.Logf("actual docker config: %v", dockerconfig)
@ -206,6 +226,184 @@ func Test_Provide(t *testing.T) {
}
}
// This test calls Provide in parallel for different registries and images
// The purpose of this is to detect any race conditions while cache rw.
func Test_ProvideParallel(t *testing.T) {
tclock := clock.RealClock{}
testcases := []struct {
name string
registry string
}{
{
name: "provide for registry 1",
registry: "test1.registry.io",
},
{
name: "provide for registry 2",
registry: "test2.registry.io",
},
{
name: "provide for registry 3",
registry: "test3.registry.io",
},
{
name: "provide for registry 4",
registry: "test4.registry.io",
},
}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test1.registry.io", "test2.registry.io", "test3.registry.io", "test4.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheDuration: time.Minute * 1,
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
"test.registry.io": {
Username: "user",
Password: "password",
},
},
},
}
dockerconfig := credentialprovider.DockerConfig{
"test.registry.io": credentialprovider.DockerConfigEntry{
Username: "user",
Password: "password",
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(w *sync.WaitGroup) {
image := fmt.Sprintf(testcase.registry+"/%s", rand.String(5))
dockerconfigResponse := pluginProvider.Provide(image)
if !reflect.DeepEqual(dockerconfigResponse, dockerconfig) {
t.Logf("actual docker config: %v", dockerconfigResponse)
t.Logf("expected docker config: %v", dockerconfig)
t.Error("unexpected docker config")
}
w.Done()
}(&wg)
}
wg.Wait()
})
}
}
func Test_getCachedCredentials(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
p := &pluginProvider{
clock: fakeClock,
lastCachePurge: fakeClock.Now(),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: fakeClock}),
plugin: &fakeExecPlugin{},
}
testcases := []struct {
name string
step time.Duration
cacheEntry cacheEntry
expectedResponse credentialprovider.DockerConfig
keyLength int
getKey string
}{
{
name: "It should return not expired credential",
step: 1 * time.Second,
keyLength: 1,
getKey: "image1",
expectedResponse: map[string]credentialprovider.DockerConfigEntry{
"image1": {
Username: "user1",
Password: "pass1",
},
},
cacheEntry: cacheEntry{
key: "image1",
expiresAt: fakeClock.Now().Add(1 * time.Minute),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image1": {
Username: "user1",
Password: "pass1",
},
},
},
},
{
name: "It should not return expired credential",
step: 2 * time.Minute,
getKey: "image2",
keyLength: 1,
cacheEntry: cacheEntry{
key: "image2",
expiresAt: fakeClock.Now(),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image2": {
Username: "user2",
Password: "pass2",
},
},
},
},
{
name: "It should delete expired credential during purge",
step: 18 * time.Minute,
keyLength: 0,
// while get call for random, cache purge will be called and it will delete expired
// image3 credentials. We cannot use image3 as getKey here, as it will get deleted during
// get only, we will not be able verify the purge call.
getKey: "random",
cacheEntry: cacheEntry{
key: "image3",
expiresAt: fakeClock.Now().Add(2 * time.Minute),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image3": {
Username: "user3",
Password: "pass3",
},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
p.cache.Add(&tc.cacheEntry)
fakeClock.Step(tc.step)
// getCachedCredentials returns unexpired credentials.
res, _, err := p.getCachedCredentials(tc.getKey)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if !reflect.DeepEqual(res, tc.expectedResponse) {
t.Logf("response %v", res)
t.Logf("expected response %v", tc.expectedResponse)
t.Errorf("Unexpected response")
}
// Listkeys returns all the keys present in cache including expired keys.
if len(p.cache.ListKeys()) != tc.keyLength {
t.Errorf("Unexpected cache key length")
}
})
}
}
func Test_encodeRequest(t *testing.T) {
testcases := []struct {
name string
@ -316,9 +514,12 @@ func Test_decodeResponse(t *testing.T) {
}
func Test_RegistryCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
cacheDuration: time.Hour,
@ -366,9 +567,12 @@ func Test_RegistryCacheKeyType(t *testing.T) {
}
func Test_ImageCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
cacheDuration: time.Hour,
@ -416,9 +620,12 @@ func Test_ImageCacheKeyType(t *testing.T) {
}
func Test_GlobalCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
cacheDuration: time.Hour,
@ -466,9 +673,12 @@ func Test_GlobalCacheKeyType(t *testing.T) {
}
func Test_NoCacheResponse(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
cacheDuration: 0, // no cache

1
vendor/modules.txt vendored
View File

@ -1023,6 +1023,7 @@ golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c => golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
## explicit
golang.org/x/sync/singleflight
# golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 => golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
## explicit