kubelet: integrate the image pull manager

This commit is contained in:
Stanislav Láznička 2024-10-16 17:17:45 +02:00
parent b3befff631
commit 3793becbb9
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
7 changed files with 418 additions and 62 deletions

View File

@ -36,6 +36,7 @@ import (
credentialproviderplugin "k8s.io/kubernetes/pkg/credentialprovider/plugin"
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -49,10 +50,11 @@ type ImagePodPullingTimeRecorder interface {
// imageManager provides the functionalities for image pulling.
type imageManager struct {
recorder record.EventRecorder
imageService kubecontainer.ImageService
backOff *flowcontrol.Backoff
prevPullErrMsg sync.Map
recorder record.EventRecorder
imageService kubecontainer.ImageService
imagePullManager ImagePullManager
backOff *flowcontrol.Backoff
prevPullErrMsg sync.Map
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
puller imagePuller
@ -64,7 +66,19 @@ type imageManager struct {
var _ ImageManager = &imageManager{}
// NewImageManager instantiates a new ImageManager object.
func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovider.DockerKeyring, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
func NewImageManager(
recorder record.EventRecorder,
nodeKeyring credentialprovider.DockerKeyring,
imageService kubecontainer.ImageService,
imagePullManager ImagePullManager,
imageBackOff *flowcontrol.Backoff,
serialized bool,
maxParallelImagePulls *int32,
qps float32,
burst int,
podPullingTimeRecorder ImagePodPullingTimeRecorder,
) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller
@ -76,6 +90,7 @@ func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovid
return &imageManager{
recorder: recorder,
imageService: imageService,
imagePullManager: imagePullManager,
nodeKeyring: nodeKeyring,
backOff: imageBackOff,
puller: puller,
@ -85,33 +100,25 @@ func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovid
// imagePullPrecheck inspects the pull policy and checks for image presence accordingly,
// returning (imageRef, error msg, err) and logging any errors.
func (m *imageManager) imagePullPrecheck(ctx context.Context, objRef *v1.ObjectReference, logPrefix string, pullPolicy v1.PullPolicy, spec *kubecontainer.ImageSpec, imgRef string) (imageRef string, msg string, err error) {
func (m *imageManager) imagePullPrecheck(ctx context.Context, objRef *v1.ObjectReference, logPrefix string, pullPolicy v1.PullPolicy, spec *kubecontainer.ImageSpec, requestedImage string) (imageRef string, msg string, err error) {
switch pullPolicy {
case v1.PullAlways:
return "", msg, nil
case v1.PullIfNotPresent:
case v1.PullIfNotPresent, v1.PullNever:
imageRef, err = m.imageService.GetImageRef(ctx, *spec)
if err != nil {
msg = fmt.Sprintf("Failed to inspect image %q: %v", imageRef, err)
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrImageInspect
}
return imageRef, msg, nil
case v1.PullNever:
imageRef, err = m.imageService.GetImageRef(ctx, *spec)
if err != nil {
msg = fmt.Sprintf("Failed to inspect image %q: %v", imageRef, err)
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrImageInspect
}
if imageRef == "" {
msg = fmt.Sprintf("Container image %q is not present with pull policy of Never", imgRef)
m.logIt(objRef, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
return "", msg, ErrImageNeverPull
}
return imageRef, msg, nil
}
return
if len(imageRef) == 0 && pullPolicy == v1.PullNever {
msg, err = m.imageNotPresentOnNeverPolicyError(logPrefix, objRef, requestedImage)
return "", msg, err
}
return imageRef, msg, nil
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
@ -123,15 +130,30 @@ func (m *imageManager) logIt(objRef *v1.ObjectReference, eventtype, event, prefi
}
}
// EnsureImageExists pulls the image for the specified pod and imgRef, and returns
// imageNotPresentOnNeverPolicy error is a utility function that emits an event about
// an image not being present and returns the appropriate error to be passed on.
//
// Called in 2 scenarios:
// 1. image is not present with `imagePullPolicy: Never“
// 2. image is present but cannot be accessed with the presented set of credentials
//
// We don't want to reveal the presence of an image if it cannot be accessed, hence we
// want the same behavior in both the above scenarios.
func (m *imageManager) imageNotPresentOnNeverPolicyError(logPrefix string, objRef *v1.ObjectReference, requestedImage string) (string, error) {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", requestedImage)
m.logIt(objRef, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
return msg, ErrImageNeverPull
}
// EnsureImageExists pulls the image for the specified pod and requestedImage, and returns
// (imageRef, error message, error).
func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, imgRef string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (imageRef, message string, err error) {
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, imgRef)
func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, requestedImage string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (imageRef, message string, err error) {
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, requestedImage)
// If the image contains no tag or digest, a default tag should be applied.
image, err := applyDefaultImageTag(imgRef)
image, err := applyDefaultImageTag(requestedImage)
if err != nil {
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", imgRef, err)
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", requestedImage, err)
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrInvalidImageName
}
@ -150,18 +172,12 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
RuntimeHandler: podRuntimeHandler,
}
imageRef, message, err = m.imagePullPrecheck(ctx, objRef, logPrefix, pullPolicy, &spec, imgRef)
imageRef, message, err = m.imagePullPrecheck(ctx, objRef, logPrefix, pullPolicy, &spec, requestedImage)
if err != nil {
return "", message, err
}
if imageRef != "" {
msg := fmt.Sprintf("Container image %q already present on machine", imgRef)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
return imageRef, msg, nil
}
img := spec.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
repoToPull, _, _, err := parsers.ParseImageName(spec.Image)
if err != nil {
return "", err.Error(), err
}
@ -188,13 +204,67 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
}
pullCredentials, _ := keyring.Lookup(repoToPull)
return m.pullImage(ctx, logPrefix, objRef, pod.UID, imgRef, spec, pullCredentials, podSandboxConfig)
if imageRef != "" {
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
msg := fmt.Sprintf("Container image %q already present on machine", requestedImage)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
return imageRef, msg, nil
}
var imagePullSecrets []kubeletconfiginternal.ImagePullSecret
for _, s := range pullCredentials {
if s.Source == nil {
// we're only interested in creds that are not node accessible
continue
}
imagePullSecrets = append(imagePullSecrets, kubeletconfiginternal.ImagePullSecret{
UID: string(s.Source.Secret.UID),
Name: s.Source.Secret.Name,
Namespace: s.Source.Secret.Namespace,
CredentialHash: s.AuthConfigHash,
})
}
pullRequired := m.imagePullManager.MustAttemptImagePull(requestedImage, imageRef, imagePullSecrets)
if !pullRequired {
msg := fmt.Sprintf("Container image %q already present on machine and can be accessed by the pod", requestedImage)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
return imageRef, msg, nil
}
}
if pullPolicy == v1.PullNever {
// The image is present as confirmed by imagePullPrecheck but it apparently
// wasn't accessible given the credentials check by the imagePullManager.
msg, err := m.imageNotPresentOnNeverPolicyError(logPrefix, objRef, requestedImage)
return "", msg, err
}
return m.pullImage(ctx, logPrefix, objRef, pod.UID, requestedImage, spec, pullCredentials, podSandboxConfig)
}
func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *v1.ObjectReference, podUID types.UID, imgRef string, imgSpec kubecontainer.ImageSpec, pullCredentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (imageRef, message string, err error) {
backOffKey := fmt.Sprintf("%s_%s", podUID, imgRef)
func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *v1.ObjectReference, podUID types.UID, image string, imgSpec kubecontainer.ImageSpec, pullCredentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (imageRef, message string, err error) {
var pullSucceeded bool
var finalPullCredentials *credentialprovider.TrackedAuthConfig
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
if err := m.imagePullManager.RecordPullIntent(image); err != nil {
return "", fmt.Sprintf("Failed to record image pull intent for container image %q: %v", image, err), err
}
defer func() {
if pullSucceeded {
m.imagePullManager.RecordImagePulled(image, imageRef, trackedToImagePullCreds(finalPullCredentials))
} else {
m.imagePullManager.RecordImagePullFailed(image)
}
}()
}
backOffKey := fmt.Sprintf("%s_%s", podUID, image)
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", imgRef)
msg := fmt.Sprintf("Back-off pulling image %q", image)
m.logIt(objRef, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
// Wrap the error from the actual pull if available.
@ -211,16 +281,16 @@ func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *
m.prevPullErrMsg.Delete(backOffKey)
m.podPullingTimeRecorder.RecordImageStartedPulling(podUID)
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", imgRef), klog.Info)
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", image), klog.Info)
startTime := time.Now()
pullChan := make(chan pullResult)
m.puller.pullImage(ctx, imgSpec, pullCredentials, pullChan, podSandboxConfig)
imagePullResult := <-pullChan
if imagePullResult.err != nil {
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", imgRef, imagePullResult.err), klog.Warning)
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", image, imagePullResult.err), klog.Warning)
m.backOff.Next(backOffKey, m.backOff.Clock.Now())
msg, err := evalCRIPullErr(imgRef, imagePullResult.err)
msg, err := evalCRIPullErr(image, imagePullResult.err)
// Store the actual pull error for providing that information during
// the image pull back-off.
@ -231,9 +301,11 @@ func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *
m.podPullingTimeRecorder.RecordImageFinishedPulling(podUID)
imagePullDuration := time.Since(startTime).Truncate(time.Millisecond)
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v (%v including waiting). Image size: %v bytes.",
imgRef, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
image, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
metrics.ImagePullDuration.WithLabelValues(metrics.GetImageSizeBucket(imagePullResult.imageSize)).Observe(imagePullDuration.Seconds())
m.backOff.GC()
finalPullCredentials = imagePullResult.credentialsUsed
pullSucceeded = true
return imagePullResult.imageRef, "", nil
}
@ -287,3 +359,23 @@ func applyDefaultImageTag(image string) (string, error) {
}
return image, nil
}
func trackedToImagePullCreds(trackedCreds *credentialprovider.TrackedAuthConfig) *kubeletconfiginternal.ImagePullCredentials {
ret := &kubeletconfiginternal.ImagePullCredentials{}
switch {
case trackedCreds == nil, trackedCreds.Source == nil:
ret.NodePodsAccessible = true
default:
sourceSecret := trackedCreds.Source.Secret
ret.KubernetesSecrets = []kubeletconfiginternal.ImagePullSecret{
{
UID: sourceSecret.UID,
Name: sourceSecret.Name,
Namespace: sourceSecret.Namespace,
CredentialHash: trackedCreds.AuthConfigHash,
},
}
}
return ret
}

View File

@ -30,13 +30,16 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
crierrors "k8s.io/cri-api/pkg/errors"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/test/utils/ktesting"
@ -54,17 +57,29 @@ type pullerExpects struct {
}
type pullerTestCase struct {
testName string
containerImage string
policy v1.PullPolicy
inspectErr error
pullerErr error
qps float32
burst int
expected []pullerExpects
testName string
containerImage string
policy v1.PullPolicy
pullSecrets []v1.Secret
allowedCredentials map[string][]kubeletconfiginternal.ImagePullSecret // image -> allowedCredentials; nil means allow all
inspectErr error
pullerErr error
qps float32
burst int
expected []pullerExpects
enableFeatures []featuregate.Feature
}
func pullerTestCases() []pullerTestCase {
return append(
noFGPullerTestCases(),
ensureSecretImagesTestCases()...,
)
}
// noFGPullerTestCases returns all test cases that test the default behavior without any
// feature gate required
func noFGPullerTestCases() []pullerTestCase {
return []pullerTestCase{
{ // pull missing image
testName: "image missing, pull",
@ -83,7 +98,7 @@ func pullerTestCases() []pullerTestCase {
}},
{ // image present, don't pull
testName: "image present, don't pull ",
testName: "image present, allow all, don't pull ",
containerImage: "present_image",
policy: v1.PullIfNotPresent,
inspectErr: nil,
@ -336,6 +351,142 @@ func pullerTestCases() []pullerTestCase {
}
}
// ensureSecretImages returns test cases specific for the KubeletEnsureSecretPulledImages
// featuregate plus a copy of all non-featuregated tests, but it requests the featuregate
// to be enabled there, too
func ensureSecretImagesTestCases() []pullerTestCase {
testCases := []pullerTestCase{
{
testName: "[KubeletEnsureSecretPulledImages] image present, unknown to image pull manager, pull",
containerImage: "present_image",
policy: v1.PullIfNotPresent,
allowedCredentials: map[string][]kubeletconfiginternal.ImagePullSecret{
"another_image": {{Namespace: "testns", Name: "testname", UID: "testuid"}},
},
pullSecrets: []v1.Secret{makeDockercfgSecretForRepo(metav1.ObjectMeta{Namespace: "testns", Name: "testname", UID: "testuid"}, "docker.io/library/present_image")},
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
enableFeatures: []featuregate.Feature{features.KubeletEnsureSecretPulledImages},
expected: []pullerExpects{
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
}},
{
testName: "[KubeletEnsureSecretPulledImages] image present, unknown secret to image pull manager, pull",
containerImage: "present_image",
policy: v1.PullIfNotPresent,
allowedCredentials: map[string][]kubeletconfiginternal.ImagePullSecret{
"present_image": {{Namespace: "testns", Name: "testname", UID: "testuid"}},
},
pullSecrets: []v1.Secret{makeDockercfgSecretForRepo(metav1.ObjectMeta{Namespace: "testns", Name: "testname", UID: "someothertestuid"}, "docker.io/library/present_image")},
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
enableFeatures: []featuregate.Feature{features.KubeletEnsureSecretPulledImages},
expected: []pullerExpects{
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef", "PullImage", "GetImageSize"}, nil, true, true,
[]v1.Event{
{Reason: "Pulling"},
{Reason: "Pulled"},
}, ""},
},
},
{
testName: "[KubeletEnsureSecretPulledImages] image present, unknown secret to image pull manager, never pull policy -> fail",
containerImage: "present_image",
policy: v1.PullNever,
allowedCredentials: map[string][]kubeletconfiginternal.ImagePullSecret{
"present_image": {{Namespace: "testns", Name: "testname", UID: "testuid"}},
},
pullSecrets: []v1.Secret{makeDockercfgSecretForRepo(metav1.ObjectMeta{Namespace: "testns", Name: "testname", UID: "someothertestuid"}, "docker.io/library/present_image")},
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
enableFeatures: []featuregate.Feature{features.KubeletEnsureSecretPulledImages},
expected: []pullerExpects{
{[]string{"GetImageRef"}, ErrImageNeverPull, false, false,
[]v1.Event{
{Reason: "ErrImageNeverPull"},
}, ""},
{[]string{"GetImageRef"}, ErrImageNeverPull, false, false,
[]v1.Event{
{Reason: "ErrImageNeverPull"},
}, ""},
{[]string{"GetImageRef"}, ErrImageNeverPull, false, false,
[]v1.Event{
{Reason: "ErrImageNeverPull"},
}, ""},
},
},
{
testName: "[KubeletEnsureSecretPulledImages] image present, a secret matches one of known to the image pull manager, don't pull",
containerImage: "present_image",
policy: v1.PullIfNotPresent,
allowedCredentials: map[string][]kubeletconfiginternal.ImagePullSecret{
"present_image": {{Namespace: "testns", Name: "testname", UID: "testuid"}},
},
pullSecrets: []v1.Secret{
makeDockercfgSecretForRepo(metav1.ObjectMeta{Namespace: "testns", Name: "testname", UID: "someothertestuid"}, "docker.io/library/present_image"),
makeDockercfgSecretForRepo(metav1.ObjectMeta{Namespace: "testns", Name: "testname", UID: "testuid"}, "docker.io/library/present_image"),
},
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
enableFeatures: []featuregate.Feature{features.KubeletEnsureSecretPulledImages},
expected: []pullerExpects{
{[]string{"GetImageRef"}, nil, false, false,
[]v1.Event{
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef"}, nil, false, false,
[]v1.Event{
{Reason: "Pulled"},
}, ""},
{[]string{"GetImageRef"}, nil, false, false,
[]v1.Event{
{Reason: "Pulled"},
}, ""},
},
},
}
for _, tc := range noFGPullerTestCases() {
tc.testName = "[KubeletEnsureSecretPulledImages] " + tc.testName
tc.enableFeatures = append(tc.enableFeatures, features.KubeletEnsureSecretPulledImages)
testCases = append(testCases, tc)
}
return testCases
}
type mockPodPullingTimeRecorder struct {
sync.Mutex
startedPullingRecorded bool
@ -361,7 +512,46 @@ func (m *mockPodPullingTimeRecorder) reset() {
m.finishedPullingRecorded = false
}
func pullerTestEnv(t *testing.T, c pullerTestCase, serialized bool, maxParallelImagePulls *int32) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container, fakePodPullingTimeRecorder *mockPodPullingTimeRecorder, fakeRecorder *testutil.FakeRecorder) {
type mockImagePullManager struct {
NoopImagePullManager
imageAllowlist map[string]sets.Set[kubeletconfiginternal.ImagePullSecret]
allowAll bool
}
func (m *mockImagePullManager) MustAttemptImagePull(image, _ string, podSecrets []kubeletconfiginternal.ImagePullSecret) bool {
if m.allowAll == true {
return false
}
cachedSecrets, ok := m.imageAllowlist[image]
if !ok {
return true
}
// cut off all the hashes and only determine the match based on the secret coords to simplify testing
for _, s := range podSecrets {
if cachedSecrets.Has(kubeletconfiginternal.ImagePullSecret{Namespace: s.Namespace, Name: s.Name, UID: s.UID}) {
return false
}
}
return true
}
func pullerTestEnv(
t *testing.T,
c pullerTestCase,
serialized bool,
maxParallelImagePulls *int32,
) (
puller ImageManager,
fakeClock *testingclock.FakeClock,
fakeRuntime *ctest.FakeRuntime,
container *v1.Container,
fakePodPullingTimeRecorder *mockPodPullingTimeRecorder,
fakeRecorder *testutil.FakeRecorder,
) {
container = &v1.Container{
Name: "container_name",
Image: c.containerImage,
@ -381,7 +571,19 @@ func pullerTestEnv(t *testing.T, c pullerTestCase, serialized bool, maxParallelI
fakePodPullingTimeRecorder = &mockPodPullingTimeRecorder{}
puller = NewImageManager(fakeRecorder, &credentialprovider.BasicDockerKeyring{}, fakeRuntime, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, fakePodPullingTimeRecorder)
pullManager := &mockImagePullManager{allowAll: true}
if c.allowedCredentials != nil {
pullManager.allowAll = false
pullManager.imageAllowlist = make(map[string]sets.Set[kubeletconfiginternal.ImagePullSecret])
for image, secrets := range c.allowedCredentials {
pullManager.imageAllowlist[image] = sets.New(secrets...)
}
}
for _, fg := range c.enableFeatures {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, fg, true)
}
puller = NewImageManager(fakeRecorder, &credentialprovider.BasicDockerKeyring{}, fakeRuntime, pullManager, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, fakePodPullingTimeRecorder)
return
}
@ -406,7 +608,7 @@ func TestParallelPuller(t *testing.T) {
fakeRuntime.CalledFunctions = nil
fakeClock.Step(time.Second)
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -439,7 +641,7 @@ func TestSerializedPuller(t *testing.T) {
fakeRuntime.CalledFunctions = nil
fakeClock.Step(time.Second)
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, msg, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -503,7 +705,7 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) {
fakeRuntime.ImageList = []Image{}
fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -560,7 +762,7 @@ func TestPullAndListImageWithRuntimeHandlerInImageCriAPIFeatureGate(t *testing.T
fakeRuntime.ImageList = []Image{}
fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, runtimeHandler, container.ImagePullPolicy)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, c.pullSecrets, nil, runtimeHandler, container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -619,7 +821,7 @@ func TestMaxParallelImagePullsLimit(t *testing.T) {
for i := 0; i < maxParallelImagePulls; i++ {
wg.Add(1)
go func() {
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy)
assert.NoError(t, err)
wg.Done()
}()
@ -631,7 +833,7 @@ func TestMaxParallelImagePullsLimit(t *testing.T) {
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, testCase.pullSecrets, nil, "", container.ImagePullPolicy)
assert.NoError(t, err)
wg.Done()
}()
@ -732,7 +934,7 @@ func TestImagePullPrecheck(t *testing.T) {
fakeRecorder.Events = []*v1.Event{}
fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, &v1.ObjectReference{}, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
_, _, err := puller.EnsureImageExists(ctx, &v1.ObjectReference{}, pod, container.Image, c.pullSecrets, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls)
var recorderEvents []v1.Event
for _, event := range fakeRecorder.Events {
@ -748,3 +950,13 @@ func TestImagePullPrecheck(t *testing.T) {
})
}
}
func makeDockercfgSecretForRepo(sMeta metav1.ObjectMeta, repo string) v1.Secret {
return v1.Secret{
ObjectMeta: sMeta,
Type: v1.SecretTypeDockerConfigJson,
Data: map[string][]byte{
v1.DockerConfigJsonKey: []byte(`{"auths": {"` + repo + `": {"auth": "dXNlcjpwYXNzd29yZA=="}}}`),
},
}
}

View File

@ -776,6 +776,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.MaxParallelImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.ImagePullCredentialsVerificationPolicy,
kubeCfg.PreloadedImagesVerificationAllowlist,
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
singleProcessOOMKill,

View File

@ -3411,6 +3411,8 @@ func TestSyncPodSpans(t *testing.T) {
kubeCfg.MaxParallelImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
string(kubeletconfiginternal.NeverVerify),
nil,
"",
"",
nil,

View File

@ -135,6 +135,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
kubecontainer.FilterEventRecorder(recorder),
&credentialprovider.BasicDockerKeyring{},
kubeRuntimeManager,
&images.NoopImagePullManager{},
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,

View File

@ -311,10 +311,21 @@ func TestPullWithSecrets(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
require.NoError(t, err)
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(t.TempDir())
if err != nil {
t.Fatal("failed to setup an file pull records accessor")
}
imagePullManager, err := images.NewImagePullManager(context.Background(), fsRecordAccessor, images.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
if err != nil {
t.Fatal("failed to setup an image pull manager")
}
fakeManager.imagePuller = images.NewImageManager(
fakeManager.recorder,
builtInKeyRing,
fakeManager,
imagePullManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,
@ -374,10 +385,21 @@ func TestPullWithSecretsWithError(t *testing.T) {
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
}
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(t.TempDir())
if err != nil {
t.Fatal("failed to setup an file pull records accessor")
}
imagePullManager, err := images.NewImagePullManager(context.Background(), fsRecordAccessor, images.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
if err != nil {
t.Fatal("failed to setup an image pull manager")
}
fakeManager.imagePuller = images.NewImageManager(
fakeManager.recorder,
&credentialprovider.BasicDockerKeyring{},
fakeManager,
imagePullManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/credentialprovider/plugin"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
@ -67,6 +68,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
sc "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/utils/ptr"
)
const (
@ -204,6 +206,8 @@ func NewKubeGenericRuntimeManager(
maxParallelImagePulls *int32,
imagePullQPS float32,
imagePullBurst int,
imagePullsCredentialVerificationPolicy string,
preloadedImagesCredentialVerificationWhitelist []string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
singleProcessOOMKill *bool,
@ -283,12 +287,33 @@ func NewKubeGenericRuntimeManager(
}
}
nodeKeyring := credentialprovider.NewDefaultDockerKeyring()
var imagePullManager images.ImagePullManager = &images.NoopImagePullManager{}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
imagePullCredentialsVerificationPolicy, err := images.NewImagePullCredentialVerificationPolicy(
kubeletconfiginternal.ImagePullCredentialsVerificationPolicy(imagePullsCredentialVerificationPolicy),
preloadedImagesCredentialVerificationWhitelist)
if err != nil {
return nil, err
}
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(rootDirectory)
if err != nil {
return nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err)
}
imagePullManager, err = images.NewImagePullManager(ctx, fsRecordAccessor, imagePullCredentialsVerificationPolicy, kubeRuntimeManager, ptr.Deref(maxParallelImagePulls, 0))
if err != nil {
return nil, fmt.Errorf("failed to create image pull manager: %w", err)
}
}
nodeKeyring := credentialprovider.NewDefaultDockerKeyring()
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
nodeKeyring,
kubeRuntimeManager,
imagePullManager,
imageBackOff,
serializeImagePulls,
maxParallelImagePulls,