Update credential provider plugin to support using service account token

Signed-off-by: Anish Ramasekar <anish.ramasekar@gmail.com>
This commit is contained in:
Anish Ramasekar 2024-10-31 10:02:13 -07:00
parent 6defd8c0bd
commit ad8666ce88
No known key found for this signature in database
GPG Key ID: E96F745A34A409C2
19 changed files with 1778 additions and 379 deletions

View File

@ -22,7 +22,9 @@ import (
"strings"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
credentialproviderv1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1"
"k8s.io/kubernetes/pkg/credentialprovider"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
@ -70,7 +72,7 @@ func decode(data []byte) (*kubeletconfig.CredentialProviderConfig, error) {
}
// validateCredentialProviderConfig validates CredentialProviderConfig.
func validateCredentialProviderConfig(config *kubeletconfig.CredentialProviderConfig) field.ErrorList {
func validateCredentialProviderConfig(config *kubeletconfig.CredentialProviderConfig, saTokenForCredentialProviders bool) field.ErrorList {
allErrs := field.ErrorList{}
if len(config.Providers) == 0 {
@ -125,7 +127,56 @@ func validateCredentialProviderConfig(config *kubeletconfig.CredentialProviderCo
if provider.DefaultCacheDuration != nil && provider.DefaultCacheDuration.Duration < 0 {
allErrs = append(allErrs, field.Invalid(fieldPath.Child("defaultCacheDuration"), provider.DefaultCacheDuration.Duration, "defaultCacheDuration must be greater than or equal to 0"))
}
if provider.TokenAttributes != nil {
fldPath := fieldPath.Child("tokenAttributes")
if !saTokenForCredentialProviders {
allErrs = append(allErrs, field.Forbidden(fldPath, "tokenAttributes is not supported when KubeletServiceAccountTokenForCredentialProviders feature gate is disabled"))
}
if len(provider.TokenAttributes.ServiceAccountTokenAudience) == 0 {
allErrs = append(allErrs, field.Required(fldPath.Child("serviceAccountTokenAudience"), "serviceAccountTokenAudience is required"))
}
if provider.TokenAttributes.RequireServiceAccount == nil {
allErrs = append(allErrs, field.Required(fldPath.Child("requireServiceAccount"), "requireServiceAccount is required"))
}
if provider.APIVersion != credentialproviderv1.SchemeGroupVersion.String() {
allErrs = append(allErrs, field.Forbidden(fldPath, fmt.Sprintf("tokenAttributes is only supported for %s API version", credentialproviderv1.SchemeGroupVersion.String())))
}
if provider.TokenAttributes.RequireServiceAccount != nil && !*provider.TokenAttributes.RequireServiceAccount && len(provider.TokenAttributes.RequiredServiceAccountAnnotationKeys) > 0 {
allErrs = append(allErrs, field.Forbidden(fldPath.Child("requiredServiceAccountAnnotationKeys"), "requireServiceAccount cannot be false when requiredServiceAccountAnnotationKeys is set"))
}
allErrs = append(allErrs, validateServiceAccountAnnotationKeys(fldPath.Child("requiredServiceAccountAnnotationKeys"), provider.TokenAttributes.RequiredServiceAccountAnnotationKeys)...)
allErrs = append(allErrs, validateServiceAccountAnnotationKeys(fldPath.Child("optionalServiceAccountAnnotationKeys"), provider.TokenAttributes.OptionalServiceAccountAnnotationKeys)...)
requiredServiceAccountAnnotationKeys := sets.New[string](provider.TokenAttributes.RequiredServiceAccountAnnotationKeys...)
optionalServiceAccountAnnotationKeys := sets.New[string](provider.TokenAttributes.OptionalServiceAccountAnnotationKeys...)
duplicateAnnotationKeys := requiredServiceAccountAnnotationKeys.Intersection(optionalServiceAccountAnnotationKeys)
if duplicateAnnotationKeys.Len() > 0 {
allErrs = append(allErrs, field.Invalid(fldPath, sets.List(duplicateAnnotationKeys), "annotation keys cannot be both required and optional"))
}
}
}
return allErrs
}
// validateServiceAccountAnnotationKeys validates the service account annotation keys.
func validateServiceAccountAnnotationKeys(fldPath *field.Path, keys []string) field.ErrorList {
allErrs := field.ErrorList{}
seenAnnotationKeys := sets.New[string]()
// Using the validation logic for keys from https://github.com/kubernetes/kubernetes/blob/69dbc74417304328a9fd3c161643dc4f0a057f41/staging/src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go#L46-L51
for _, k := range keys {
// The rule is QualifiedName except that case doesn't matter, so convert to lowercase before checking.
for _, msg := range validation.IsQualifiedName(strings.ToLower(k)) {
allErrs = append(allErrs, field.Invalid(fldPath, k, msg))
}
if seenAnnotationKeys.Has(k) {
allErrs = append(allErrs, field.Duplicate(fldPath, k))
}
seenAnnotationKeys.Insert(k)
}
return allErrs
}

View File

@ -19,16 +19,17 @@ package plugin
import (
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
utiltesting "k8s.io/client-go/util/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/utils/ptr"
)
func Test_readCredentialProviderConfigFile(t *testing.T) {
@ -337,6 +338,48 @@ providers:
config: nil,
expectErr: `strict decoding error: unknown field "providers[0].unknownField"`,
},
{
name: "v1alpha1 config with token attributes should fail",
configData: `---
kind: CredentialProviderConfig
apiVersion: kubelet.config.k8s.io/v1alpha1
providers:
- name: test
matchImages:
- "registry.io/foobar"
defaultCacheDuration: 10m
apiVersion: credentialprovider.kubelet.k8s.io/v1alpha1
tokenAttributes:
serviceAccountTokenAudience: audience
args:
- --v=5
env:
- name: FOO
value: BAR`,
config: nil,
expectErr: `strict decoding error: unknown field "providers[0].tokenAttributes"`,
},
{
name: "v1beta1 config with token attributes should fail",
configData: `---
kind: CredentialProviderConfig
apiVersion: kubelet.config.k8s.io/v1beta1
providers:
- name: test
matchImages:
- "registry.io/foobar"
defaultCacheDuration: 10m
apiVersion: credentialprovider.kubelet.k8s.io/v1beta1
tokenAttributes:
serviceAccountTokenAudience: audience
args:
- --v=5
env:
- name: FOO
value: BAR`,
config: nil,
expectErr: `strict decoding error: unknown field "providers[0].tokenAttributes"`,
},
}
for _, testcase := range testcases {
@ -347,17 +390,19 @@ providers:
}
defer utiltesting.CloseAndRemove(t, file)
_, err = file.WriteString(testcase.configData)
if err != nil {
if _, err = file.WriteString(testcase.configData); err != nil {
t.Fatal(err)
}
authConfig, err := readCredentialProviderConfigFile(file.Name())
if err != nil && len(testcase.expectErr) == 0 {
if err != nil {
if len(testcase.expectErr) == 0 {
t.Fatal(err)
}
if err == nil && len(testcase.expectErr) > 0 {
if !strings.Contains(err.Error(), testcase.expectErr) {
t.Fatalf("expected error %q but got %q", testcase.expectErr, err.Error())
}
} else if len(testcase.expectErr) > 0 {
t.Fatalf("expected error %q but got none", testcase.expectErr)
}
@ -554,11 +599,248 @@ func Test_validateCredentialProviderConfig(t *testing.T) {
},
},
},
{
name: "token attributes set without KubeletServiceAccountTokenForCredentialProviders feature gate enabled",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
},
},
},
},
expectErr: `providers.tokenAttributes: Forbidden: tokenAttributes is not supported when KubeletServiceAccountTokenForCredentialProviders feature gate is disabled`,
},
{
name: "token attributes not nil but empty ServiceAccountTokenAudience",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
RequiredServiceAccountAnnotationKeys: []string{"prefix.io/annotation-1", "prefix.io/annotation-2"},
RequireServiceAccount: ptr.To(true),
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.serviceAccountTokenAudience: Required value: serviceAccountTokenAudience is required`,
},
{
name: "token attributes not nil but empty ServiceAccountTokenRequired",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequiredServiceAccountAnnotationKeys: []string{"prefix.io/annotation-1", "prefix.io/annotation-2"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.requireServiceAccount: Required value: requireServiceAccount is required`,
},
{
name: "required service account annotation keys not qualified name (same validation as metav1.ObjectMeta)",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"cantendwithadash-", "now-with-dashes/simple"}, // first key is invalid
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.requiredServiceAccountAnnotationKeys: Invalid value: "cantendwithadash-": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')`,
},
{
name: "optional service account annotation keys not qualified name (same validation as metav1.ObjectMeta)",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
OptionalServiceAccountAnnotationKeys: []string{"cantendwithadash-", "now-with-dashes/simple"}, // first key is invalid
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.optionalServiceAccountAnnotationKeys: Invalid value: "cantendwithadash-": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')`,
},
{
name: "duplicate required service account annotation keys",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple", "now-with-dashes/simple"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.requiredServiceAccountAnnotationKeys: Duplicate value: "now-with-dashes/simple"`,
},
{
name: "duplicate optional service account annotation keys",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
OptionalServiceAccountAnnotationKeys: []string{"now-with-dashes/simple", "now-with-dashes/simple"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.optionalServiceAccountAnnotationKeys: Duplicate value: "now-with-dashes/simple"`,
},
{
name: "annotation key in required and optional keys",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple-1", "now-with-dashes/simple-2"},
OptionalServiceAccountAnnotationKeys: []string{"now-with-dashes/simple-2", "now-with-dashes/simple-3"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes: Invalid value: []string{"now-with-dashes/simple-2"}: annotation keys cannot be both required and optional`,
},
{
name: "required annotation keys set when requireServiceAccount is false",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(false),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple-1", "now-with-dashes/simple-2"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes.requiredServiceAccountAnnotationKeys: Forbidden: requireServiceAccount cannot be false when requiredServiceAccountAnnotationKeys is set`,
},
{
name: "valid config with KubeletServiceAccountTokenForCredentialProviders feature gate enabled",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple-1", "now-with-dashes/simple-2"},
OptionalServiceAccountAnnotationKeys: []string{"now-with-dashes/simple-3"},
},
},
},
},
saTokenForCredentialProviders: true,
},
{
name: "tokenAttributes set with credentialprovider.kubelet.k8s.io/v1alpha1 APIVersion",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1alpha1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes: Forbidden: tokenAttributes is only supported for credentialprovider.kubelet.k8s.io/v1 API version`,
},
{
name: "tokenAttributes set with credentialprovider.kubelet.k8s.io/v1beta1 APIVersion",
config: &kubeletconfig.CredentialProviderConfig{
Providers: []kubeletconfig.CredentialProvider{
{
Name: "foobar",
MatchImages: []string{"foobar.registry.io"},
DefaultCacheDuration: &metav1.Duration{Duration: time.Minute},
APIVersion: "credentialprovider.kubelet.k8s.io/v1beta1",
TokenAttributes: &kubeletconfig.ServiceAccountTokenAttributes{
ServiceAccountTokenAudience: "audience",
RequireServiceAccount: ptr.To(true),
RequiredServiceAccountAnnotationKeys: []string{"now-with-dashes/simple"},
},
},
},
},
saTokenForCredentialProviders: true,
expectErr: `providers.tokenAttributes: Forbidden: tokenAttributes is only supported for credentialprovider.kubelet.k8s.io/v1 API version`,
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
errs := validateCredentialProviderConfig(testcase.config).ToAggregate()
errs := validateCredentialProviderConfig(testcase.config, testcase.saTokenForCredentialProviders).ToAggregate()
if d := cmp.Diff(testcase.expectErr, errString(errs)); d != "" {
t.Fatalf("CredentialProviderConfig validation mismatch (-want +got):\n%s", d)
}

View File

@ -19,6 +19,7 @@ package plugin
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
@ -28,12 +29,18 @@ import (
"sync"
"time"
"golang.org/x/crypto/cryptobyte"
"golang.org/x/sync/singleflight"
authenticationv1 "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
@ -42,6 +49,7 @@ import (
credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1"
credentialproviderv1beta1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletconfigv1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1"
kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1alpha1"
@ -75,7 +83,10 @@ func init() {
// RegisterCredentialProviderPlugins is called from kubelet to register external credential provider
// plugins according to the CredentialProviderConfig config file.
func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string) error {
func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string,
getServiceAccountToken func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error),
getServiceAccount func(namespace, name string) (*v1.ServiceAccount, error),
) error {
if _, err := os.Stat(pluginBinDir); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("plugin binary directory %s did not exist", pluginBinDir)
@ -89,8 +100,8 @@ func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string) er
return err
}
errs := validateCredentialProviderConfig(credentialProviderConfig)
if len(errs) > 0 {
saTokenForCredentialProvidersFeatureEnabled := utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders)
if errs := validateCredentialProviderConfig(credentialProviderConfig, saTokenForCredentialProvidersFeatureEnabled); len(errs) > 0 {
return fmt.Errorf("failed to validate credential provider config: %v", errs.ToAggregate())
}
@ -109,19 +120,22 @@ func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string) er
return fmt.Errorf("error inspecting binary executable %s: %w", pluginBin, err)
}
plugin, err := newPluginProvider(pluginBinDir, provider)
plugin, err := newPluginProvider(pluginBinDir, provider, getServiceAccountToken, getServiceAccount)
if err != nil {
return fmt.Errorf("error initializing plugin provider %s: %w", provider.Name, err)
}
credentialprovider.RegisterCredentialProvider(provider.Name, plugin)
registerCredentialProviderPlugin(provider.Name, plugin)
}
return nil
}
// newPluginProvider returns a new pluginProvider based on the credential provider config.
func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialProvider) (*pluginProvider, error) {
func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialProvider,
getServiceAccountToken func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error),
getServiceAccount func(namespace, name string) (*v1.ServiceAccount, error),
) (*pluginProvider, error) {
mediaType := "application/json"
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
@ -134,7 +148,6 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
}
clock := clock.RealClock{}
return &pluginProvider{
clock: clock,
matchImages: provider.MatchImages,
@ -150,6 +163,7 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
envVars: provider.Env,
environ: os.Environ,
},
serviceAccountProvider: newServiceAccountProvider(provider, getServiceAccount, getServiceAccountToken),
}, nil
}
@ -178,6 +192,101 @@ type pluginProvider struct {
// lastCachePurge is the last time cache is cleaned for expired entries.
lastCachePurge time.Time
// serviceAccountProvider holds the logic for handling service account tokens when needed.
serviceAccountProvider *serviceAccountProvider
}
type serviceAccountProvider struct {
audience string
requireServiceAccount bool
getServiceAccountFunc func(namespace, name string) (*v1.ServiceAccount, error)
getServiceAccountTokenFunc func(podNamespace, serviceAccountName string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
requiredServiceAccountAnnotationKeys []string
optionalServiceAccountAnnotationKeys []string
}
func newServiceAccountProvider(
provider kubeletconfig.CredentialProvider,
getServiceAccount func(namespace, name string) (*v1.ServiceAccount, error),
getServiceAccountToken func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error),
) *serviceAccountProvider {
featureGateEnabled := utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders)
serviceAccountTokenAudienceSet := provider.TokenAttributes != nil && len(provider.TokenAttributes.ServiceAccountTokenAudience) > 0
if !featureGateEnabled || !serviceAccountTokenAudienceSet {
return nil
}
return &serviceAccountProvider{
audience: provider.TokenAttributes.ServiceAccountTokenAudience,
requireServiceAccount: *provider.TokenAttributes.RequireServiceAccount,
getServiceAccountFunc: getServiceAccount,
getServiceAccountTokenFunc: getServiceAccountToken,
requiredServiceAccountAnnotationKeys: provider.TokenAttributes.RequiredServiceAccountAnnotationKeys,
optionalServiceAccountAnnotationKeys: provider.TokenAttributes.OptionalServiceAccountAnnotationKeys,
}
}
type requiredAnnotationNotFoundError string
func (e requiredAnnotationNotFoundError) Error() string {
return fmt.Sprintf("required annotation %s not found", string(e))
}
// getServiceAccountData returns the service account UID and required annotations for the service account.
// If the service account does not exist, an error is returned.
// saAnnotations is a map of annotation keys and values that the plugin requires to generate credentials
// that's defined in the tokenAttributes in the credential provider config.
// requiredServiceAccountAnnotationKeys are the keys that are required to be present in the service account.
// If any of the keys defined in this list are not present in the service account, kubelet will not invoke the plugin
// and will return an error.
// optionalServiceAccountAnnotationKeys are the keys that are optional to be present in the service account.
// If present, they will be added to the saAnnotations map.
func (s *serviceAccountProvider) getServiceAccountData(namespace, name string) (types.UID, map[string]string, error) {
sa, err := s.getServiceAccountFunc(namespace, name)
if err != nil {
return "", nil, err
}
saAnnotations := make(map[string]string, len(s.requiredServiceAccountAnnotationKeys)+len(s.optionalServiceAccountAnnotationKeys))
for _, k := range s.requiredServiceAccountAnnotationKeys {
val, ok := sa.Annotations[k]
if !ok {
return "", nil, requiredAnnotationNotFoundError(k)
}
saAnnotations[k] = val
}
for _, k := range s.optionalServiceAccountAnnotationKeys {
if val, ok := sa.Annotations[k]; ok {
saAnnotations[k] = val
}
}
return sa.UID, saAnnotations, nil
}
// getServiceAccountToken returns a service account token for the service account.
func (s *serviceAccountProvider) getServiceAccountToken(podNamespace, podName, serviceAccountName string, podUID types.UID) (string, error) {
tr, err := s.getServiceAccountTokenFunc(podNamespace, serviceAccountName, &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
Audiences: []string{s.audience},
// expirationSeconds is not set explicitly here. It has the same default value of "ExpirationSeconds" in the TokenRequestSpec.
BoundObjectRef: &authenticationv1.BoundObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: podName,
UID: podUID,
},
},
})
if err != nil {
return "", err
}
return tr.Status.Token, nil
}
// cacheEntry is the cache object that will be stored in cache.Store.
@ -204,15 +313,86 @@ func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}
// Provide returns a credentialprovider.DockerConfig based on the credentials returned
// perPluginProvider holds the shared pluginProvider and the per-request information
// like podName, podNamespace, podUID and serviceAccountName.
// This is used to provide the per-request information to the pluginProvider.provide method, so
// that the plugin can use this information to get the pod's service account and generate bound service account tokens
// for plugins running in service account token mode.
type perPodPluginProvider struct {
name string
provider *pluginProvider
podNamespace string
podName string
podUID types.UID
serviceAccountName string
}
// Enabled always returns true since registration of the plugin via kubelet implies it should be enabled.
func (p *perPodPluginProvider) Enabled() bool {
return true
}
func (p *perPodPluginProvider) Provide(image string) credentialprovider.DockerConfig {
return p.provider.provide(image, p.podNamespace, p.podName, p.podUID, p.serviceAccountName)
}
// provide returns a credentialprovider.DockerConfig based on the credentials returned
// from cache or the exec plugin.
func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
func (p *pluginProvider) provide(image, podNamespace, podName string, podUID types.UID, serviceAccountName string) credentialprovider.DockerConfig {
if !p.isImageAllowed(image) {
return credentialprovider.DockerConfig{}
}
cachedConfig, found, err := p.getCachedCredentials(image)
var serviceAccountUID types.UID
var serviceAccountToken string
var saAnnotations map[string]string
var err error
var serviceAccountCacheKey string
if p.serviceAccountProvider != nil {
if len(serviceAccountName) == 0 && p.serviceAccountProvider.requireServiceAccount {
klog.V(5).Infof("Service account name is empty for pod %s/%s", podNamespace, podName)
return credentialprovider.DockerConfig{}
}
// If the service account name is empty and the plugin has indicated that invoking the plugin
// without a service account is allowed, we will continue without generating a service account token.
// This is useful for plugins that are running in service account token mode and are also used
// to pull images for pods without service accounts (e.g., static pods).
if len(serviceAccountName) > 0 {
if serviceAccountUID, saAnnotations, err = p.serviceAccountProvider.getServiceAccountData(podNamespace, serviceAccountName); err != nil {
var requiredAnnotationNotFoundErr requiredAnnotationNotFoundError
if errors.As(err, &requiredAnnotationNotFoundErr) {
// The required annotation could be a mechanism for individual workloads to opt in to using service account tokens
// for image pull. If any of the required annotation is missing, we will not invoke the plugin. We will log the error
// at higher verbosity level as it could be noisy.
klog.V(5).Infof("Failed to get service account data %s/%s: %v", podNamespace, serviceAccountName, err)
return credentialprovider.DockerConfig{}
}
klog.Errorf("Failed to get service account %s/%s: %v", podNamespace, serviceAccountName, err)
return credentialprovider.DockerConfig{}
}
if serviceAccountToken, err = p.serviceAccountProvider.getServiceAccountToken(podNamespace, podName, serviceAccountName, podUID); err != nil {
klog.Errorf("Error getting service account token %s/%s: %v", podNamespace, serviceAccountName, err)
return credentialprovider.DockerConfig{}
}
serviceAccountCacheKey, err = generateServiceAccountCacheKey(podNamespace, serviceAccountName, serviceAccountUID, saAnnotations)
if err != nil {
klog.Errorf("Error generating service account cache key: %v", err)
return credentialprovider.DockerConfig{}
}
}
}
// Check if the credentials are cached and return them if found.
cachedConfig, found, errCache := p.getCachedCredentials(image, serviceAccountCacheKey)
if errCache != nil {
klog.Errorf("Failed to get cached docker config: %v", err)
return credentialprovider.DockerConfig{}
}
@ -227,8 +407,23 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
// foo.bar.registry
// foo.bar.registry/image1
// foo.bar.registry/image2
res, err, _ := p.group.Do(image, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image)
// When the plugin is operating in the service account token mode, the singleflight key is the image plus the serviceAccountCacheKey
// which is generated from the service account namespace, name, uid and the annotations passed to the plugin.
singleFlightKey := image
if p.serviceAccountProvider != nil && len(serviceAccountName) > 0 {
// When the plugin is operating in the service account token mode, the singleflight key is the
// image + sa annotations + sa token.
// This does mean the singleflight key is different for each image pull request (even if the image is the same)
// and the workload is using the same service account.
// In the future, when we support caching of the service account token for pod-sa pairs, this will be singleflighted
// for different containers in the same pod using the same image.
if singleFlightKey, err = generateSingleFlightKey(image, getHashIfNotEmpty(serviceAccountToken), saAnnotations); err != nil {
klog.Errorf("Error generating singleflight key: %v", err)
return credentialprovider.DockerConfig{}
}
}
res, err, _ := p.group.Do(singleFlightKey, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image, serviceAccountToken, saAnnotations)
})
if err != nil {
@ -280,6 +475,12 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
}
cacheKey, err = generateCacheKey(cacheKey, serviceAccountCacheKey)
if err != nil {
klog.Errorf("Error generating cache key: %v", err)
return credentialprovider.DockerConfig{}
}
cachedEntry := &cacheEntry{
key: cacheKey,
credentials: dockerConfig,
@ -310,7 +511,7 @@ func (p *pluginProvider) isImageAllowed(image string) bool {
}
// getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
func (p *pluginProvider) getCachedCredentials(image, serviceAccountCacheKey string) (credentialprovider.DockerConfig, bool, error) {
p.Lock()
if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
// NewExpirationCache purges expired entries when List() is called
@ -321,7 +522,12 @@ func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.
}
p.Unlock()
obj, found, err := p.cache.GetByKey(image)
cacheKey, err := generateCacheKey(image, serviceAccountCacheKey)
if err != nil {
return nil, false, fmt.Errorf("error generating cache key: %w", err)
}
obj, found, err := p.cache.GetByKey(cacheKey)
if err != nil {
return nil, false, err
}
@ -331,7 +537,13 @@ func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.
}
registry := parseRegistry(image)
obj, found, err = p.cache.GetByKey(registry)
cacheKey, err = generateCacheKey(registry, serviceAccountCacheKey)
if err != nil {
return nil, false, fmt.Errorf("error generating cache key: %w", err)
}
obj, found, err = p.cache.GetByKey(cacheKey)
if err != nil {
return nil, false, err
}
@ -340,7 +552,12 @@ func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.
return obj.(*cacheEntry).credentials, true, nil
}
obj, found, err = p.cache.GetByKey(globalCacheKey)
cacheKey, err = generateCacheKey(globalCacheKey, serviceAccountCacheKey)
if err != nil {
return nil, false, fmt.Errorf("error generating cache key: %w", err)
}
obj, found, err = p.cache.GetByKey(cacheKey)
if err != nil {
return nil, false, err
}
@ -355,7 +572,7 @@ func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.
// Plugin is the interface calling ExecPlugin. This is mainly for testability
// so tests don't have to actually exec any processes.
type Plugin interface {
ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error)
ExecPlugin(ctx context.Context, image, serviceAccountToken string, serviceAccountAnnotations map[string]string) (*credentialproviderapi.CredentialProviderResponse, error)
}
// execPlugin is the implementation of the Plugin interface that execs a credential provider plugin based
@ -377,10 +594,10 @@ type execPlugin struct {
//
// The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
// return CredentialProviderResponse via stdout.
func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
func (e *execPlugin) ExecPlugin(ctx context.Context, image, serviceAccountToken string, serviceAccountAnnotations map[string]string) (*credentialproviderapi.CredentialProviderResponse, error) {
klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)
authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image, ServiceAccountToken: serviceAccountToken, ServiceAccountAnnotations: serviceAccountAnnotations}
data, err := e.encodeRequest(authRequest)
if err != nil {
return nil, fmt.Errorf("failed to encode auth request: %w", err)
@ -499,3 +716,96 @@ func mergeEnvVars(sysEnvVars, credProviderVars []string) []string {
mergedEnvVars = append(mergedEnvVars, credProviderVars...)
return mergedEnvVars
}
// generateServiceAccountCacheKey generates the serviceaccount cache key to be used for
// 1. constructing the cache key for the service account token based plugin in addition to the actual cache key (image, registry, global).
// 2. the unique key to use singleflight for the plugin in addition to the image.
func generateServiceAccountCacheKey(serviceAccountNamespace, serviceAccountName string, serviceAccountUID types.UID, saAnnotations map[string]string) (string, error) {
b := cryptobyte.NewBuilder(nil)
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(serviceAccountNamespace))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(serviceAccountName))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(serviceAccountUID))
})
// add the length of annotations to the cache key
b.AddUint32(uint32(len(saAnnotations)))
// Sort the annotations by key to ensure the cache key is deterministic
keys := sets.StringKeySet(saAnnotations).List()
for _, k := range keys {
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(k))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(saAnnotations[k]))
})
}
keyBytes, err := b.Bytes()
if err != nil {
return "", err
}
return string(keyBytes), nil
}
func generateCacheKey(baseKey, serviceAccountCacheKey string) (string, error) {
b := cryptobyte.NewBuilder(nil)
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(baseKey))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(serviceAccountCacheKey))
})
keyBytes, err := b.Bytes()
if err != nil {
return "", err
}
return string(keyBytes), nil
}
func generateSingleFlightKey(image, saTokenHash string, saAnnotations map[string]string) (string, error) {
b := cryptobyte.NewBuilder(nil)
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(image))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(saTokenHash))
})
// add the length of annotations to the cache key
b.AddUint32(uint32(len(saAnnotations)))
// Sort the annotations by key to ensure the cache key is deterministic
keys := sets.StringKeySet(saAnnotations).List()
for _, k := range keys {
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(k))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes([]byte(saAnnotations[k]))
})
}
keyBytes, err := b.Bytes()
if err != nil {
return "", err
}
return string(keyBytes), nil
}
// getHashIfNotEmpty returns the sha256 hash of the data if it is not empty.
func getHashIfNotEmpty(data string) string {
if len(data) > 0 {
return fmt.Sprintf("sha256:%x", sha256.Sum256([]byte(data)))
}
return ""
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,98 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
)
type provider struct {
name string
impl *pluginProvider
}
var providersMutex sync.RWMutex
var providers = make([]provider, 0)
var seenProviderNames = sets.NewString()
func registerCredentialProviderPlugin(name string, p *pluginProvider) {
providersMutex.Lock()
defer providersMutex.Unlock()
if seenProviderNames.Has(name) {
klog.Fatalf("Credential provider %q was registered twice", name)
}
seenProviderNames.Insert(name)
providers = append(providers, provider{name, p})
klog.V(4).Infof("Registered credential provider %q", name)
}
type externalCredentialProviderKeyring struct {
providers []credentialprovider.DockerConfigProvider
}
func NewExternalCredentialProviderDockerKeyring(podNamespace, podName, podUID, serviceAccountName string) credentialprovider.DockerKeyring {
providersMutex.RLock()
defer providersMutex.RUnlock()
keyring := &externalCredentialProviderKeyring{
providers: make([]credentialprovider.DockerConfigProvider, 0, len(providers)),
}
for _, p := range providers {
if !p.impl.Enabled() {
continue
}
pp := &perPodPluginProvider{
name: p.name,
provider: p.impl,
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders) {
klog.V(4).InfoS("Generating per pod credential provider", "provider", p.name, "podName", podName, "podNamespace", podNamespace, "podUID", podUID, "serviceAccountName", serviceAccountName)
pp.podNamespace = podNamespace
pp.podName = podName
pp.podUID = types.UID(podUID)
pp.serviceAccountName = serviceAccountName
} else {
klog.V(4).InfoS("Generating credential provider", "provider", p.name)
}
keyring.providers = append(keyring.providers, pp)
}
return keyring
}
func (k *externalCredentialProviderKeyring) Lookup(image string) ([]credentialprovider.AuthConfig, bool) {
keyring := &credentialprovider.BasicDockerKeyring{}
for _, p := range k.providers {
keyring.Add(p.Provide(image))
}
return keyring.Lookup(image)
}

View File

@ -1,70 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package credentialprovider
import (
"sync"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
type provider struct {
name string
impl DockerConfigProvider
}
// All registered credential providers.
var providersMutex sync.Mutex
var providers = make([]provider, 0)
var seenProviderNames = sets.NewString()
// RegisterCredentialProvider is called by provider implementations on
// initialization to register themselves, like so:
//
// func init() {
// RegisterCredentialProvider("name", &myProvider{...})
// }
func RegisterCredentialProvider(name string, p DockerConfigProvider) {
providersMutex.Lock()
defer providersMutex.Unlock()
if seenProviderNames.Has(name) {
klog.Fatalf("Credential provider %q was registered twice", name)
}
seenProviderNames.Insert(name)
providers = append(providers, provider{name, p})
klog.V(4).Infof("Registered credential provider %q", name)
}
// NewDockerKeyring creates a DockerKeyring to use for resolving credentials,
// which draws from the set of registered credential providers.
func NewDockerKeyring() DockerKeyring {
keyring := &providersDockerKeyring{
Providers: make([]DockerConfigProvider, 0),
}
for _, p := range providers {
if p.impl.Enabled() {
klog.V(4).Infof("Registering credential provider: %v", p.name)
keyring.Providers = append(keyring.Providers, p.impl)
}
}
return keyring
}

View File

@ -42,15 +42,6 @@ type DockerConfigProvider interface {
// A DockerConfigProvider that simply reads the .dockercfg file
type defaultDockerConfigProvider struct{}
// init registers our default provider, which simply reads the .dockercfg file.
func init() {
RegisterCredentialProvider(".dockercfg",
&CachingDockerConfigProvider{
Provider: &defaultDockerConfigProvider{},
Lifetime: 5 * time.Minute,
})
}
// CachingDockerConfigProvider implements DockerConfigProvider by composing
// with another DockerConfigProvider and caching the DockerConfig it provides
// for a pre-specified lifetime.
@ -107,3 +98,16 @@ func (d *CachingDockerConfigProvider) Provide(image string) DockerConfig {
}
return config
}
// NewDefaultDockerKeyring creates a DockerKeyring to use for resolving credentials,
// which returns the default credentials from the .dockercfg file.
func NewDefaultDockerKeyring() DockerKeyring {
return &providersDockerKeyring{
Providers: []DockerConfigProvider{
&CachingDockerConfigProvider{
Provider: &defaultDockerConfigProvider{},
Lifetime: 5 * time.Minute,
},
},
}
}

View File

@ -152,7 +152,7 @@ type StreamingRuntime interface {
type ImageService interface {
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary. It returns a reference (digest or ID) to the pulled image.
PullImage(ctx context.Context, image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
PullImage(ctx context.Context, image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error)
// GetImageRef gets the reference (digest or ID) of the image which has already been in
// the local storage. It returns ("", nil) if the image isn't in the local storage.
GetImageRef(ctx context.Context, image ImageSpec) (string, error)

View File

@ -308,7 +308,7 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container
return f.Err
}
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
f.Lock()
f.CalledFunctions = append(f.CalledFunctions, "PullImage")
if f.Err == nil {

View File

@ -990,9 +990,9 @@ func (_c *MockRuntime_ListPodSandboxMetrics_Call) RunAndReturn(run func(context.
return _c
}
// PullImage provides a mock function with given fields: ctx, image, pullSecrets, podSandboxConfig
func (_m *MockRuntime) PullImage(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig) (string, error) {
ret := _m.Called(ctx, image, pullSecrets, podSandboxConfig)
// PullImage provides a mock function with given fields: ctx, image, pullSecrets, podSandboxConfig, serviceAccountName
func (_m *MockRuntime) PullImage(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig, serviceAccountName string) (string, error) {
ret := _m.Called(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
if len(ret) == 0 {
panic("no return value specified for PullImage")
@ -1000,17 +1000,17 @@ func (_m *MockRuntime) PullImage(ctx context.Context, image container.ImageSpec,
var r0 string
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig) (string, error)); ok {
return rf(ctx, image, pullSecrets, podSandboxConfig)
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) (string, error)); ok {
return rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
}
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig) string); ok {
r0 = rf(ctx, image, pullSecrets, podSandboxConfig)
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) string); ok {
r0 = rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig) error); ok {
r1 = rf(ctx, image, pullSecrets, podSandboxConfig)
if rf, ok := ret.Get(1).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) error); ok {
r1 = rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
} else {
r1 = ret.Error(1)
}
@ -1028,13 +1028,14 @@ type MockRuntime_PullImage_Call struct {
// - image container.ImageSpec
// - pullSecrets []corev1.Secret
// - podSandboxConfig *v1.PodSandboxConfig
func (_e *MockRuntime_Expecter) PullImage(ctx interface{}, image interface{}, pullSecrets interface{}, podSandboxConfig interface{}) *MockRuntime_PullImage_Call {
return &MockRuntime_PullImage_Call{Call: _e.mock.On("PullImage", ctx, image, pullSecrets, podSandboxConfig)}
// - serviceAccountName string
func (_e *MockRuntime_Expecter) PullImage(ctx interface{}, image interface{}, pullSecrets interface{}, podSandboxConfig interface{}, serviceAccountName interface{}) *MockRuntime_PullImage_Call {
return &MockRuntime_PullImage_Call{Call: _e.mock.On("PullImage", ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)}
}
func (_c *MockRuntime_PullImage_Call) Run(run func(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig)) *MockRuntime_PullImage_Call {
func (_c *MockRuntime_PullImage_Call) Run(run func(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig, serviceAccountName string)) *MockRuntime_PullImage_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(container.ImageSpec), args[2].([]corev1.Secret), args[3].(*v1.PodSandboxConfig))
run(args[0].(context.Context), args[1].(container.ImageSpec), args[2].([]corev1.Secret), args[3].(*v1.PodSandboxConfig), args[4].(string))
})
return _c
}
@ -1044,7 +1045,7 @@ func (_c *MockRuntime_PullImage_Call) Return(_a0 string, _a1 error) *MockRuntime
return _c
}
func (_c *MockRuntime_PullImage_Call) RunAndReturn(run func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig) (string, error)) *MockRuntime_PullImage_Call {
func (_c *MockRuntime_PullImage_Call) RunAndReturn(run func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) (string, error)) *MockRuntime_PullImage_Call {
_c.Call.Return(run)
return _c
}

View File

@ -44,9 +44,9 @@ type throttledImageService struct {
limiter flowcontrol.RateLimiter
}
func (ts throttledImageService) PullImage(ctx context.Context, image kubecontainer.ImageSpec, secrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
func (ts throttledImageService) PullImage(ctx context.Context, image kubecontainer.ImageSpec, secrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
if ts.limiter.TryAccept() {
return ts.ImageService.PullImage(ctx, image, secrets, podSandboxConfig)
return ts.ImageService.PullImage(ctx, image, secrets, podSandboxConfig, serviceAccountName)
}
return "", fmt.Errorf("pull QPS exceeded")
}

View File

@ -175,7 +175,7 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", imgRef), klog.Info)
startTime := time.Now()
pullChan := make(chan pullResult)
m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig)
m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig, pod.Spec.ServiceAccountName)
imagePullResult := <-pullChan
if imagePullResult.err != nil {
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", imgRef, imagePullResult.err), klog.Warning)

View File

@ -34,7 +34,7 @@ type pullResult struct {
}
type imagePuller interface {
pullImage(context.Context, kubecontainer.ImageSpec, []v1.Secret, chan<- pullResult, *runtimeapi.PodSandboxConfig)
pullImage(context.Context, kubecontainer.ImageSpec, []v1.Secret, chan<- pullResult, *runtimeapi.PodSandboxConfig, string)
}
var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{}
@ -51,14 +51,14 @@ func newParallelImagePuller(imageService kubecontainer.ImageService, maxParallel
return &parallelImagePuller{imageService, make(chan struct{}, *maxParallelImagePulls)}
}
func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) {
go func() {
if pip.tokens != nil {
pip.tokens <- struct{}{}
defer func() { <-pip.tokens }()
}
startTime := time.Now()
imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig)
imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig, serviceAccountName)
var size uint64
if err == nil && imageRef != "" {
// Getting the image size with best effort, ignoring the error.
@ -93,22 +93,24 @@ type imagePullRequest struct {
pullSecrets []v1.Secret
pullChan chan<- pullResult
podSandboxConfig *runtimeapi.PodSandboxConfig
serviceAccountName string
}
func (sip *serialImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
func (sip *serialImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) {
sip.pullRequests <- &imagePullRequest{
ctx: ctx,
spec: spec,
pullSecrets: pullSecrets,
pullChan: pullChan,
podSandboxConfig: podSandboxConfig,
serviceAccountName: serviceAccountName,
}
}
func (sip *serialImagePuller) processImagePullRequests() {
for pullRequest := range sip.pullRequests {
startTime := time.Now()
imageRef, err := sip.imageService.PullImage(pullRequest.ctx, pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig)
imageRef, err := sip.imageService.PullImage(pullRequest.ctx, pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig, pullRequest.serviceAccountName)
var size uint64
if err == nil && imageRef != "" {
// Getting the image size with best effort, ignoring the error.

View File

@ -41,6 +41,7 @@ import (
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/informers"
"k8s.io/mount-utils"
@ -713,6 +714,19 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
}
tokenManager := token.NewManager(kubeDeps.KubeClient)
getServiceAccount := func(namespace, name string) (*v1.ServiceAccount, error) {
return nil, fmt.Errorf("get service account is not implemented")
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders) {
getServiceAccount = func(namespace, name string) (*v1.ServiceAccount, error) {
if klet.kubeClient == nil {
return nil, errors.New("cannot get ServiceAccounts when kubelet is in standalone mode")
}
return klet.kubeClient.CoreV1().ServiceAccounts(namespace).Get(ctx, name, metav1.GetOptions{})
}
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
@ -747,6 +761,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
*kubeCfg.MemoryThrottlingFactor,
kubeDeps.PodStartupLatencyTracker,
kubeDeps.TracerProvider,
tokenManager,
getServiceAccount,
)
if err != nil {
return nil, err
@ -876,8 +892,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps.Recorder)
}
tokenManager := token.NewManager(kubeDeps.KubeClient)
var clusterTrustBundleManager clustertrustbundle.Manager = &clustertrustbundle.NoopManager{}
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
clusterTrustBundleManager = clustertrustbundle.NewLazyInformerManager(ctx, kubeDeps.KubeClient, 2*int(kubeCfg.MaxPods))

View File

@ -40,6 +40,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
core "k8s.io/client-go/testing"
"k8s.io/mount-utils"
@ -3424,6 +3425,8 @@ func TestSyncPodSpans(t *testing.T) {
*kubeCfg.MemoryThrottlingFactor,
kubeletutil.NewPodStartupLatencyTracker(),
tp,
token.NewManager(kubelet.kubeClient),
func(string, string) (*v1.ServiceAccount, error) { return nil, nil },
)
assert.NoError(t, err)

View File

@ -24,6 +24,8 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
credentialproviderplugin "k8s.io/kubernetes/pkg/credentialprovider/plugin"
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -32,14 +34,26 @@ import (
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
func (m *kubeGenericRuntimeManager) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
func (m *kubeGenericRuntimeManager) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
img := image.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
if err != nil {
return "", err
}
keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, m.keyring)
// construct the dynamic keyring using the providers we have in the kubelet
var podName, podNamespace, podUID string
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders) {
sandboxMetadata := podSandboxConfig.GetMetadata()
podName = sandboxMetadata.Name
podNamespace = sandboxMetadata.Namespace
podUID = sandboxMetadata.Uid
}
externalCredentialProviderKeyring := credentialproviderplugin.NewExternalCredentialProviderDockerKeyring(podNamespace, podName, podUID, serviceAccountName)
keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, credentialprovider.UnionDockerKeyring{m.keyring, externalCredentialProviderKeyring})
if err != nil {
return "", err
}

View File

@ -37,7 +37,7 @@ func TestPullImage(t *testing.T) {
_, _, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
assert.NoError(t, err)
assert.Equal(t, "busybox", imageRef)
@ -53,12 +53,12 @@ func TestPullImageWithError(t *testing.T) {
assert.NoError(t, err)
// trying to pull an image with an invalid name should return an error
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: ":invalid"}, nil, nil)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: ":invalid"}, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
imageRef, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
imageRef, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
@ -75,7 +75,7 @@ func TestPullImageWithInvalidImageName(t *testing.T) {
fakeImageService.SetFakeImages(imageList)
for _, val := range imageList {
ctx := context.Background()
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: val}, nil, nil)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: val}, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
@ -196,7 +196,7 @@ func TestRemoveImage(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
assert.NoError(t, err)
assert.Len(t, fakeImageService.Images, 1)
@ -219,7 +219,7 @@ func TestRemoveImageWithError(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
assert.NoError(t, err)
assert.Len(t, fakeImageService.Images, 1)
@ -324,7 +324,7 @@ func TestPullWithSecrets(t *testing.T) {
_, fakeImageService, fakeManager, err := customTestRuntimeManager(builtInKeyRing)
require.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil, "")
require.NoError(t, err)
fakeImageService.AssertImagePulledWithAuth(t, &runtimeapi.ImageSpec{Image: test.imageName, Annotations: make(map[string]string)}, test.expectedAuth, description)
}
@ -375,7 +375,7 @@ func TestPullWithSecretsWithError(t *testing.T) {
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
}
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
@ -398,7 +398,7 @@ func TestPullThenListWithAnnotations(t *testing.T) {
},
}
_, err = fakeManager.PullImage(ctx, imageSpec, nil, nil)
_, err = fakeManager.PullImage(ctx, imageSpec, nil, nil, "")
assert.NoError(t, err)
images, err := fakeManager.ListImages(ctx)

View File

@ -28,8 +28,6 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"go.opentelemetry.io/otel/trace"
grpcstatus "google.golang.org/grpc/status"
crierror "k8s.io/cri-api/pkg/errors"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -44,7 +42,8 @@ import (
"k8s.io/component-base/logs/logreduction"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
crierror "k8s.io/cri-api/pkg/errors"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/credentialprovider"
@ -62,6 +61,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
@ -223,6 +223,8 @@ func NewKubeGenericRuntimeManager(
memoryThrottlingFactor float64,
podPullingTimeRecorder images.ImagePodPullingTimeRecorder,
tracerProvider trace.TracerProvider,
tokenManager *token.Manager,
getServiceAccount func(string, string) (*v1.ServiceAccount, error),
) (KubeGenericRuntime, error) {
ctx := context.Background()
runtimeService = newInstrumentedRuntimeService(runtimeService)
@ -277,12 +279,12 @@ func NewKubeGenericRuntimeManager(
"apiVersion", typedVersion.RuntimeApiVersion)
if imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "" {
if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir); err != nil {
if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir, tokenManager.GetServiceAccountToken, getServiceAccount); err != nil {
klog.ErrorS(err, "Failed to register CRI auth plugins")
os.Exit(1)
}
}
kubeRuntimeManager.keyring = credentialprovider.NewDockerKeyring()
kubeRuntimeManager.keyring = credentialprovider.NewDefaultDockerKeyring()
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),

View File

@ -27,14 +27,13 @@ import (
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
noopoteltrace "go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"