kubelet imagepuller: PullImage() - accept TrackAuthConfigs directly

The image puller's PullImage() method should be just a dumb pull
without any further logic. Make it accept everything it needs
to pull an image and defer any other magic to the image manager.
This commit is contained in:
Stanislav Láznička 2024-10-16 15:45:01 +02:00
parent 09284d926c
commit cb7468b077
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
13 changed files with 193 additions and 150 deletions

View File

@ -87,11 +87,12 @@ func NewExternalCredentialProviderDockerKeyring(podNamespace, podName, podUID, s
return keyring
}
func (k *externalCredentialProviderKeyring) Lookup(image string) ([]credentialprovider.AuthConfig, bool) {
func (k *externalCredentialProviderKeyring) Lookup(image string) ([]credentialprovider.TrackedAuthConfig, bool) {
keyring := &credentialprovider.BasicDockerKeyring{}
for _, p := range k.providers {
keyring.Add(p.Provide(image))
// TODO: modify the credentialprovider.CredentialSource to contain the SA/pod information
keyring.Add(nil, p.Provide(image))
}
return keyring.Lookup(image)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/volume"
)
@ -151,8 +152,11 @@ type StreamingRuntime interface {
// ImageService interfaces allows to work with image service.
type ImageService interface {
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary. It returns a reference (digest or ID) to the pulled image.
PullImage(ctx context.Context, image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error)
// secrets if necessary.
// It returns a reference (digest or ID) to the pulled image and the credentials
// that were used to pull the image. If the returned credentials are nil, the
// pull was anonymous.
PullImage(ctx context.Context, image ImageSpec, credentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error)
// GetImageRef gets the reference (digest or ID) of the image which has already been in
// the local storage. It returns ("", nil) if the image isn't in the local storage.
GetImageRef(ctx context.Context, image ImageSpec) (string, error)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/volume"
)
@ -308,7 +309,7 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container
return f.Err
}
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, creds []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error) {
f.Lock()
f.CalledFunctions = append(f.CalledFunctions, "PullImage")
if f.Err == nil {
@ -319,9 +320,15 @@ func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSp
f.ImageList = append(f.ImageList, i)
}
// if credentials were supplied for the pull at least return the first in the list
var retCreds *credentialprovider.TrackedAuthConfig = nil
if len(creds) > 0 {
retCreds = &creds[0]
}
if !f.BlockImagePulls {
f.Unlock()
return image.Image, f.Err
return image.Image, retCreds, f.Err
}
retErr := f.Err
@ -334,7 +341,8 @@ func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSp
case <-ctx.Done():
case <-f.imagePullTokenBucket:
}
return image.Image, retErr
return image.Image, retCreds, retErr
}
// UnblockImagePulls unblocks a certain number of image pulls, if BlockImagePulls is true.

View File

@ -25,6 +25,8 @@ import (
corev1 "k8s.io/api/core/v1"
credentialprovider "k8s.io/kubernetes/pkg/credentialprovider"
flowcontrol "k8s.io/client-go/util/flowcontrol"
io "io"
@ -990,32 +992,41 @@ func (_c *MockRuntime_ListPodSandboxMetrics_Call) RunAndReturn(run func(context.
return _c
}
// PullImage provides a mock function with given fields: ctx, image, pullSecrets, podSandboxConfig, serviceAccountName
func (_m *MockRuntime) PullImage(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig, serviceAccountName string) (string, error) {
ret := _m.Called(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
// PullImage provides a mock function with given fields: ctx, image, credentials, podSandboxConfig
func (_m *MockRuntime) PullImage(ctx context.Context, image container.ImageSpec, credentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *v1.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error) {
ret := _m.Called(ctx, image, credentials, podSandboxConfig)
if len(ret) == 0 {
panic("no return value specified for PullImage")
}
var r0 string
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) (string, error)); ok {
return rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
var r1 *credentialprovider.TrackedAuthConfig
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []credentialprovider.TrackedAuthConfig, *v1.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error)); ok {
return rf(ctx, image, credentials, podSandboxConfig)
}
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) string); ok {
r0 = rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
if rf, ok := ret.Get(0).(func(context.Context, container.ImageSpec, []credentialprovider.TrackedAuthConfig, *v1.PodSandboxConfig) string); ok {
r0 = rf(ctx, image, credentials, podSandboxConfig)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) error); ok {
r1 = rf(ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)
if rf, ok := ret.Get(1).(func(context.Context, container.ImageSpec, []credentialprovider.TrackedAuthConfig, *v1.PodSandboxConfig) *credentialprovider.TrackedAuthConfig); ok {
r1 = rf(ctx, image, credentials, podSandboxConfig)
} else {
r1 = ret.Error(1)
if ret.Get(1) != nil {
r1 = ret.Get(1).(*credentialprovider.TrackedAuthConfig)
}
}
return r0, r1
if rf, ok := ret.Get(2).(func(context.Context, container.ImageSpec, []credentialprovider.TrackedAuthConfig, *v1.PodSandboxConfig) error); ok {
r2 = rf(ctx, image, credentials, podSandboxConfig)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockRuntime_PullImage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullImage'
@ -1026,26 +1037,25 @@ type MockRuntime_PullImage_Call struct {
// PullImage is a helper method to define mock.On call
// - ctx context.Context
// - image container.ImageSpec
// - pullSecrets []corev1.Secret
// - credentials []credentialprovider.TrackedAuthConfig
// - podSandboxConfig *v1.PodSandboxConfig
// - serviceAccountName string
func (_e *MockRuntime_Expecter) PullImage(ctx interface{}, image interface{}, pullSecrets interface{}, podSandboxConfig interface{}, serviceAccountName interface{}) *MockRuntime_PullImage_Call {
return &MockRuntime_PullImage_Call{Call: _e.mock.On("PullImage", ctx, image, pullSecrets, podSandboxConfig, serviceAccountName)}
func (_e *MockRuntime_Expecter) PullImage(ctx interface{}, image interface{}, credentials interface{}, podSandboxConfig interface{}) *MockRuntime_PullImage_Call {
return &MockRuntime_PullImage_Call{Call: _e.mock.On("PullImage", ctx, image, credentials, podSandboxConfig)}
}
func (_c *MockRuntime_PullImage_Call) Run(run func(ctx context.Context, image container.ImageSpec, pullSecrets []corev1.Secret, podSandboxConfig *v1.PodSandboxConfig, serviceAccountName string)) *MockRuntime_PullImage_Call {
func (_c *MockRuntime_PullImage_Call) Run(run func(ctx context.Context, image container.ImageSpec, credentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *v1.PodSandboxConfig)) *MockRuntime_PullImage_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(container.ImageSpec), args[2].([]corev1.Secret), args[3].(*v1.PodSandboxConfig), args[4].(string))
run(args[0].(context.Context), args[1].(container.ImageSpec), args[2].([]credentialprovider.TrackedAuthConfig), args[3].(*v1.PodSandboxConfig))
})
return _c
}
func (_c *MockRuntime_PullImage_Call) Return(_a0 string, _a1 error) *MockRuntime_PullImage_Call {
_c.Call.Return(_a0, _a1)
func (_c *MockRuntime_PullImage_Call) Return(_a0 string, _a1 *credentialprovider.TrackedAuthConfig, _a2 error) *MockRuntime_PullImage_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockRuntime_PullImage_Call) RunAndReturn(run func(context.Context, container.ImageSpec, []corev1.Secret, *v1.PodSandboxConfig, string) (string, error)) *MockRuntime_PullImage_Call {
func (_c *MockRuntime_PullImage_Call) RunAndReturn(run func(context.Context, container.ImageSpec, []credentialprovider.TrackedAuthConfig, *v1.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error)) *MockRuntime_PullImage_Call {
_c.Call.Return(run)
return _c
}

View File

@ -20,9 +20,9 @@ import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -44,9 +44,9 @@ type throttledImageService struct {
limiter flowcontrol.RateLimiter
}
func (ts throttledImageService) PullImage(ctx context.Context, image kubecontainer.ImageSpec, secrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
func (ts throttledImageService) PullImage(ctx context.Context, image kubecontainer.ImageSpec, credentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *credentialprovider.TrackedAuthConfig, error) {
if ts.limiter.TryAccept() {
return ts.ImageService.PullImage(ctx, image, secrets, podSandboxConfig, serviceAccountName)
return ts.ImageService.PullImage(ctx, image, credentials, podSandboxConfig)
}
return "", fmt.Errorf("pull QPS exceeded")
return "", nil, fmt.Errorf("pull QPS exceeded")
}

View File

@ -25,12 +25,17 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
crierrors "k8s.io/cri-api/pkg/errors"
"k8s.io/kubernetes/pkg/credentialprovider"
credentialproviderplugin "k8s.io/kubernetes/pkg/credentialprovider/plugin"
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -50,7 +55,8 @@ type imageManager struct {
prevPullErrMsg sync.Map
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
puller imagePuller
puller imagePuller
nodeKeyring credentialprovider.DockerKeyring
podPullingTimeRecorder ImagePodPullingTimeRecorder
}
@ -58,7 +64,7 @@ type imageManager struct {
var _ ImageManager = &imageManager{}
// NewImageManager instantiates a new ImageManager object.
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovider.DockerKeyring, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller
@ -70,6 +76,7 @@ func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.I
return &imageManager{
recorder: recorder,
imageService: imageService,
nodeKeyring: nodeKeyring,
backOff: imageBackOff,
puller: puller,
podPullingTimeRecorder: podPullingTimeRecorder,
@ -153,7 +160,39 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
return imageRef, msg, nil
}
backOffKey := fmt.Sprintf("%s_%s", pod.UID, imgRef)
img := spec.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
if err != nil {
return "", err.Error(), err
}
// construct the dynamic keyring using the providers we have in the kubelet
var podName, podNamespace, podUID string
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders) {
sandboxMetadata := podSandboxConfig.GetMetadata()
podName = sandboxMetadata.Name
podNamespace = sandboxMetadata.Namespace
podUID = sandboxMetadata.Uid
}
externalCredentialProviderKeyring := credentialproviderplugin.NewExternalCredentialProviderDockerKeyring(
podNamespace,
podName,
podUID,
pod.Spec.ServiceAccountName)
keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, credentialprovider.UnionDockerKeyring{m.nodeKeyring, externalCredentialProviderKeyring})
if err != nil {
return "", err.Error(), err
}
pullCredentials, _ := keyring.Lookup(repoToPull)
return m.pullImage(ctx, logPrefix, objRef, pod.UID, imgRef, spec, pullCredentials, podSandboxConfig)
}
func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *v1.ObjectReference, podUID types.UID, imgRef string, imgSpec kubecontainer.ImageSpec, pullCredentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (imageRef, message string, err error) {
backOffKey := fmt.Sprintf("%s_%s", podUID, imgRef)
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", imgRef)
m.logIt(objRef, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
@ -171,16 +210,16 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
// Ensure that the map cannot grow indefinitely.
m.prevPullErrMsg.Delete(backOffKey)
m.podPullingTimeRecorder.RecordImageStartedPulling(pod.UID)
m.podPullingTimeRecorder.RecordImageStartedPulling(podUID)
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", imgRef), klog.Info)
startTime := time.Now()
pullChan := make(chan pullResult)
m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig, pod.Spec.ServiceAccountName)
m.puller.pullImage(ctx, imgSpec, pullCredentials, pullChan, podSandboxConfig)
imagePullResult := <-pullChan
if imagePullResult.err != nil {
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", imgRef, imagePullResult.err), klog.Warning)
m.backOff.Next(backOffKey, m.backOff.Clock.Now())
msg, err := evalCRIPullErr(imgRef, imagePullResult.err)
// Store the actual pull error for providing that information during
@ -189,12 +228,13 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
return "", msg, err
}
m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID)
m.podPullingTimeRecorder.RecordImageFinishedPulling(podUID)
imagePullDuration := time.Since(startTime).Truncate(time.Millisecond)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v (%v including waiting). Image size: %v bytes.",
imgRef, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
metrics.ImagePullDuration.WithLabelValues(metrics.GetImageSizeBucket(imagePullResult.imageSize)).Observe(imagePullDuration.Seconds())
m.backOff.GC()
return imagePullResult.imageRef, "", nil
}

View File

@ -35,6 +35,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
crierrors "k8s.io/cri-api/pkg/errors"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
@ -380,7 +381,7 @@ func pullerTestEnv(t *testing.T, c pullerTestCase, serialized bool, maxParallelI
fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{}
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, fakePodPullingTimeRecorder)
puller = NewImageManager(fakeRecorder, &credentialprovider.BasicDockerKeyring{}, fakeRuntime, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, fakePodPullingTimeRecorder)
return
}

View File

@ -20,21 +20,22 @@ import (
"context"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type pullResult struct {
imageRef string
imageSize uint64
err error
pullDuration time.Duration
imageRef string
imageSize uint64
err error
pullDuration time.Duration
credentialsUsed *credentialprovider.TrackedAuthConfig
}
type imagePuller interface {
pullImage(context.Context, kubecontainer.ImageSpec, []v1.Secret, chan<- pullResult, *runtimeapi.PodSandboxConfig, string)
pullImage(context.Context, kubecontainer.ImageSpec, []credentialprovider.TrackedAuthConfig, chan<- pullResult, *runtimeapi.PodSandboxConfig)
}
var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{}
@ -51,24 +52,25 @@ func newParallelImagePuller(imageService kubecontainer.ImageService, maxParallel
return &parallelImagePuller{imageService, make(chan struct{}, *maxParallelImagePulls)}
}
func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) {
func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, credentials []credentialprovider.TrackedAuthConfig, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
go func() {
if pip.tokens != nil {
pip.tokens <- struct{}{}
defer func() { <-pip.tokens }()
}
startTime := time.Now()
imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig, serviceAccountName)
imageRef, creds, err := pip.imageService.PullImage(ctx, spec, credentials, podSandboxConfig)
var size uint64
if err == nil && imageRef != "" {
// Getting the image size with best effort, ignoring the error.
size, _ = pip.imageService.GetImageSize(ctx, spec)
}
pullChan <- pullResult{
imageRef: imageRef,
imageSize: size,
err: err,
pullDuration: time.Since(startTime),
imageRef: imageRef,
imageSize: size,
err: err,
pullDuration: time.Since(startTime),
credentialsUsed: creds,
}
}()
}
@ -88,29 +90,27 @@ func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller {
}
type imagePullRequest struct {
ctx context.Context
spec kubecontainer.ImageSpec
pullSecrets []v1.Secret
pullChan chan<- pullResult
podSandboxConfig *runtimeapi.PodSandboxConfig
serviceAccountName string
ctx context.Context
spec kubecontainer.ImageSpec
credentials []credentialprovider.TrackedAuthConfig
pullChan chan<- pullResult
podSandboxConfig *runtimeapi.PodSandboxConfig
}
func (sip *serialImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) {
func (sip *serialImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, credentials []credentialprovider.TrackedAuthConfig, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
sip.pullRequests <- &imagePullRequest{
ctx: ctx,
spec: spec,
pullSecrets: pullSecrets,
pullChan: pullChan,
podSandboxConfig: podSandboxConfig,
serviceAccountName: serviceAccountName,
ctx: ctx,
spec: spec,
credentials: credentials,
pullChan: pullChan,
podSandboxConfig: podSandboxConfig,
}
}
func (sip *serialImagePuller) processImagePullRequests() {
for pullRequest := range sip.pullRequests {
startTime := time.Now()
imageRef, err := sip.imageService.PullImage(pullRequest.ctx, pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig, pullRequest.serviceAccountName)
imageRef, creds, err := sip.imageService.PullImage(pullRequest.ctx, pullRequest.spec, pullRequest.credentials, pullRequest.podSandboxConfig)
var size uint64
if err == nil && imageRef != "" {
// Getting the image size with best effort, ignoring the error.
@ -120,8 +120,9 @@ func (sip *serialImagePuller) processImagePullRequests() {
imageRef: imageRef,
imageSize: size,
err: err,
// Note: pullDuration includes credential resolution and getting the image size.
pullDuration: time.Since(startTime),
// Note: pullDuration includes getting the image size.
pullDuration: time.Since(startTime),
credentialsUsed: creds,
}
}
}

View File

@ -94,7 +94,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID)
func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
ctx := context.Background()
recorder := &record.FakeRecorder{}
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2, 10, metav1.Duration{Duration: 10 * time.Second})
@ -113,7 +113,6 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
runtimeHelper: runtimeHelper,
runtimeService: runtimeService,
imageService: imageService,
keyring: keyring,
seccompProfileRoot: fakeSeccompProfileRoot,
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
@ -134,6 +133,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
&credentialprovider.BasicDockerKeyring{},
kubeRuntimeManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,

View File

@ -19,62 +19,35 @@ package kuberuntime
import (
"context"
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
credentialproviderplugin "k8s.io/kubernetes/pkg/credentialprovider/plugin"
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
crededentialprovider "k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/parsers"
)
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
func (m *kubeGenericRuntimeManager) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, serviceAccountName string) (string, error) {
func (m *kubeGenericRuntimeManager) PullImage(ctx context.Context, image kubecontainer.ImageSpec, credentials []crededentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, *crededentialprovider.TrackedAuthConfig, error) {
img := image.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
if err != nil {
return "", err
}
// construct the dynamic keyring using the providers we have in the kubelet
var podName, podNamespace, podUID string
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletServiceAccountTokenForCredentialProviders) {
sandboxMetadata := podSandboxConfig.GetMetadata()
podName = sandboxMetadata.Name
podNamespace = sandboxMetadata.Namespace
podUID = sandboxMetadata.Uid
}
externalCredentialProviderKeyring := credentialproviderplugin.NewExternalCredentialProviderDockerKeyring(podNamespace, podName, podUID, serviceAccountName)
keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, credentialprovider.UnionDockerKeyring{m.keyring, externalCredentialProviderKeyring})
if err != nil {
return "", err
}
imgSpec := toRuntimeAPIImageSpec(image)
creds, withCredentials := keyring.Lookup(repoToPull)
if !withCredentials {
if len(credentials) == 0 {
klog.V(3).InfoS("Pulling image without credentials", "image", img)
imageRef, err := m.imageService.PullImage(ctx, imgSpec, nil, podSandboxConfig)
if err != nil {
klog.ErrorS(err, "Failed to pull image", "image", img)
return "", err
return "", nil, err
}
return imageRef, nil
return imageRef, nil, nil
}
var pullErrs []error
for _, currentCreds := range creds {
for _, currentCreds := range credentials {
auth := &runtimeapi.AuthConfig{
Username: currentCreds.Username,
Password: currentCreds.Password,
@ -87,13 +60,13 @@ func (m *kubeGenericRuntimeManager) PullImage(ctx context.Context, image kubecon
imageRef, err := m.imageService.PullImage(ctx, imgSpec, auth, podSandboxConfig)
// If there was no error, return success
if err == nil {
return imageRef, nil
return imageRef, &currentCreds, nil
}
pullErrs = append(pullErrs, err)
}
return "", utilerrors.NewAggregate(pullErrs)
return "", nil, utilerrors.NewAggregate(pullErrs)
}
// GetImageRef gets the ID of the image which has already been in

View File

@ -21,15 +21,19 @@ import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/utils/ptr"
)
func TestPullImage(t *testing.T) {
@ -37,9 +41,10 @@ func TestPullImage(t *testing.T) {
_, _, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
imageRef, creds, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
assert.NoError(t, err)
assert.Equal(t, "busybox", imageRef)
assert.Nil(t, creds) // as this was an anonymous pull
images, err := fakeManager.ListImages(ctx)
assert.NoError(t, err)
@ -52,36 +57,17 @@ func TestPullImageWithError(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
// trying to pull an image with an invalid name should return an error
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: ":invalid"}, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
imageRef, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
imageRef, creds, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
assert.Error(t, err)
assert.Equal(t, "", imageRef)
assert.Nil(t, creds)
images, err := fakeManager.ListImages(ctx)
assert.NoError(t, err)
assert.Empty(t, images)
}
func TestPullImageWithInvalidImageName(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
imageList := []string{"FAIL", "http://fail", "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"}
fakeImageService.SetFakeImages(imageList)
for _, val := range imageList {
ctx := context.Background()
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: val}, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "", imageRef)
}
}
func TestListImages(t *testing.T) {
ctx := context.Background()
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
@ -196,7 +182,7 @@ func TestRemoveImage(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
_, _, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
assert.NoError(t, err)
assert.Len(t, fakeImageService.Images, 1)
@ -219,7 +205,7 @@ func TestRemoveImageWithError(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
assert.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil, "")
_, _, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: "busybox"}, nil, nil)
assert.NoError(t, err)
assert.Len(t, fakeImageService.Images, 1)
@ -280,13 +266,13 @@ func TestPullWithSecrets(t *testing.T) {
expectedAuth *runtimeapi.AuthConfig
}{
"no matching secrets": {
"ubuntu",
"ubuntu:latest",
[]v1.Secret{},
credentialprovider.DockerConfig(map[string]credentialprovider.DockerConfigEntry{}),
nil,
},
"default keyring secrets": {
"ubuntu",
"ubuntu:latest",
[]v1.Secret{},
credentialprovider.DockerConfig(map[string]credentialprovider.DockerConfigEntry{
"index.docker.io/v1/": {Username: "built-in", Password: "password", Provider: nil},
@ -294,7 +280,7 @@ func TestPullWithSecrets(t *testing.T) {
&runtimeapi.AuthConfig{Username: "built-in", Password: "password"},
},
"default keyring secrets unused": {
"ubuntu",
"ubuntu:latest",
[]v1.Secret{},
credentialprovider.DockerConfig(map[string]credentialprovider.DockerConfigEntry{
"extraneous": {Username: "built-in", Password: "password", Provider: nil},
@ -302,7 +288,7 @@ func TestPullWithSecrets(t *testing.T) {
nil,
},
"builtin keyring secrets, but use passed": {
"ubuntu",
"ubuntu:latest",
[]v1.Secret{{Type: v1.SecretTypeDockercfg, Data: map[string][]byte{v1.DockerConfigKey: dockercfgContent}}},
credentialprovider.DockerConfig(map[string]credentialprovider.DockerConfigEntry{
"index.docker.io/v1/": {Username: "built-in", Password: "password", Provider: nil},
@ -310,7 +296,7 @@ func TestPullWithSecrets(t *testing.T) {
&runtimeapi.AuthConfig{Username: "passed-user", Password: "passed-password"},
},
"builtin keyring secrets, but use passed with new docker config": {
"ubuntu",
"ubuntu:latest",
[]v1.Secret{{Type: v1.SecretTypeDockerConfigJson, Data: map[string][]byte{v1.DockerConfigJsonKey: dockerConfigJSONContent}}},
credentialprovider.DockerConfig(map[string]credentialprovider.DockerConfigEntry{
"index.docker.io/v1/": {Username: "built-in", Password: "password", Provider: nil},
@ -321,10 +307,23 @@ func TestPullWithSecrets(t *testing.T) {
for description, test := range tests {
builtInKeyRing := &credentialprovider.BasicDockerKeyring{}
builtInKeyRing.Add(nil, test.builtInDockerConfig)
_, fakeImageService, fakeManager, err := customTestRuntimeManager(builtInKeyRing)
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
require.NoError(t, err)
_, err = fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil, "")
fakeManager.imagePuller = images.NewImageManager(
fakeManager.recorder,
builtInKeyRing,
fakeManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,
0, // Disable image pull throttling by setting QPS to 0,
0,
&fakePodPullingTimeRecorder{},
)
_, _, err = fakeManager.imagePuller.EnsureImageExists(ctx, nil, makeTestPod("testpod", "testpod-ns", "testpod-uid", []v1.Container{}), test.imageName, test.passedSecrets, nil, "", v1.PullAlways)
require.NoError(t, err)
fakeImageService.AssertImagePulledWithAuth(t, &runtimeapi.ImageSpec{Image: test.imageName, Annotations: make(map[string]string)}, test.expectedAuth, description)
}
@ -355,12 +354,12 @@ func TestPullWithSecretsWithError(t *testing.T) {
}{
{
name: "invalid docker secret",
imageName: "ubuntu",
imageName: "ubuntu:latest",
passedSecrets: []v1.Secret{{Type: v1.SecretTypeDockercfg, Data: map[string][]byte{v1.DockerConfigKey: []byte("invalid")}}},
},
{
name: "secret provided, pull failed",
imageName: "ubuntu",
imageName: "ubuntu:latest",
passedSecrets: []v1.Secret{
{Type: v1.SecretTypeDockerConfigJson, Data: map[string][]byte{v1.DockerConfigKey: dockerConfigJSON}},
},
@ -375,7 +374,19 @@ func TestPullWithSecretsWithError(t *testing.T) {
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
}
imageRef, err := fakeManager.PullImage(ctx, kubecontainer.ImageSpec{Image: test.imageName}, test.passedSecrets, nil, "")
fakeManager.imagePuller = images.NewImageManager(
fakeManager.recorder,
&credentialprovider.BasicDockerKeyring{},
fakeManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,
0, // Disable image pull throttling by setting QPS to 0,
0,
&fakePodPullingTimeRecorder{},
)
imageRef, _, err := fakeManager.imagePuller.EnsureImageExists(ctx, nil, makeTestPod("testpod", "testpod-ns", "testpod-uid", []v1.Container{}), test.imageName, test.passedSecrets, nil, "", v1.PullAlways)
assert.Error(t, err)
assert.Equal(t, "", imageRef)
@ -398,7 +409,7 @@ func TestPullThenListWithAnnotations(t *testing.T) {
},
}
_, err = fakeManager.PullImage(ctx, imageSpec, nil, nil, "")
_, _, err = fakeManager.PullImage(ctx, imageSpec, nil, nil)
assert.NoError(t, err)
images, err := fakeManager.ListImages(ctx)

View File

@ -107,9 +107,6 @@ type kubeGenericRuntimeManager struct {
// Container GC manager
containerGC *containerGC
// Keyring for pulling images
keyring credentialprovider.DockerKeyring
// Runner of lifecycle events.
runner kubecontainer.HandlerRunner
@ -285,10 +282,12 @@ func NewKubeGenericRuntimeManager(
os.Exit(1)
}
}
kubeRuntimeManager.keyring = credentialprovider.NewDefaultDockerKeyring()
nodeKeyring := credentialprovider.NewDefaultDockerKeyring()
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
nodeKeyring,
kubeRuntimeManager,
imageBackOff,
serializeImagePulls,

View File

@ -49,7 +49,6 @@ import (
apitest "k8s.io/cri-api/pkg/apis/testing"
crierror "k8s.io/cri-api/pkg/errors"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm"
cmtesting "k8s.io/kubernetes/pkg/kubelet/cm/testing"
@ -68,10 +67,6 @@ var (
)
func createTestRuntimeManager() (*apitest.FakeRuntimeService, *apitest.FakeImageService, *kubeGenericRuntimeManager, error) {
return customTestRuntimeManager(&credentialprovider.BasicDockerKeyring{})
}
func customTestRuntimeManager(keyring *credentialprovider.BasicDockerKeyring) (*apitest.FakeRuntimeService, *apitest.FakeImageService, *kubeGenericRuntimeManager, error) {
fakeRuntimeService := apitest.NewFakeRuntimeService()
fakeImageService := apitest.NewFakeImageService()
// Only an empty machineInfo is needed here, because in unit test all containers are besteffort,
@ -82,7 +77,7 @@ func customTestRuntimeManager(keyring *credentialprovider.BasicDockerKeyring) (*
MemoryCapacity: uint64(memoryCapacityQuantity.Value()),
}
osInterface := &containertest.FakeOS{}
manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, keyring, noopoteltrace.NewTracerProvider().Tracer(""))
manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, noopoteltrace.NewTracerProvider().Tracer(""))
return fakeRuntimeService, fakeImageService, manager, err
}