implement a file-based image pull manager

This commit is contained in:
Stanislav Láznička 2024-10-16 17:17:29 +02:00
parent 0ca2333846
commit b3befff631
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
12 changed files with 1813 additions and 0 deletions

View File

@ -0,0 +1,302 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"bytes"
"crypto/sha256"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/errors"
kubeletconfigv1alpha1 "k8s.io/kubelet/config/v1alpha1"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletconfigvint1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1alpha1"
)
const (
cacheFilesSHA256Prefix = "sha256-"
tmpFilesSuffix = ".tmp"
)
var _ PullRecordsAccessor = &fsPullRecordsAccessor{}
// fsPullRecordsAccessor uses the filesystem to read/write ImagePullIntent/ImagePulledRecord
// records.
type fsPullRecordsAccessor struct {
pullingDir string
pulledDir string
encoder runtime.Encoder
decoder runtime.Decoder
}
// NewFSPullRecordsAccessor returns an accessor for the ImagePullIntent/ImagePulledRecord
// records with a filesystem as the backing database.
func NewFSPullRecordsAccessor(kubeletDir string) (*fsPullRecordsAccessor, error) {
kubeletConfigEncoder, kubeletConfigDecoder, err := createKubeletConfigSchemeEncoderDecoder()
if err != nil {
return nil, err
}
accessor := &fsPullRecordsAccessor{
pullingDir: filepath.Join(kubeletDir, "image_manager", "pulling"),
pulledDir: filepath.Join(kubeletDir, "image_manager", "pulled"),
encoder: kubeletConfigEncoder,
decoder: kubeletConfigDecoder,
}
if err := os.MkdirAll(accessor.pullingDir, 0700); err != nil {
return nil, err
}
if err := os.MkdirAll(accessor.pulledDir, 0700); err != nil {
return nil, err
}
return accessor, nil
}
func (f *fsPullRecordsAccessor) WriteImagePullIntent(image string) error {
intent := kubeletconfiginternal.ImagePullIntent{
Image: image,
}
intentBytes := bytes.NewBuffer([]byte{})
if err := f.encoder.Encode(&intent, intentBytes); err != nil {
return err
}
return writeFile(f.pullingDir, cacheFilename(image), intentBytes.Bytes())
}
func (f *fsPullRecordsAccessor) ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error) {
var intents []*kubeletconfiginternal.ImagePullIntent
// walk the pulling directory for any pull intent records
err := processDirFiles(f.pullingDir,
func(filePath string, fileContent []byte) error {
intent, err := decodeIntent(f.decoder, fileContent)
if err != nil {
return fmt.Errorf("failed to deserialize content of file %q into ImagePullIntent: %w", filePath, err)
}
intents = append(intents, intent)
return nil
})
return intents, err
}
func (f *fsPullRecordsAccessor) ImagePullIntentExists(image string) (bool, error) {
intentRecordPath := filepath.Join(f.pullingDir, cacheFilename(image))
intentBytes, err := os.ReadFile(intentRecordPath)
if os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
intent, err := decodeIntent(f.decoder, intentBytes)
if err != nil {
return false, err
}
return intent.Image == image, nil
}
func (f *fsPullRecordsAccessor) DeleteImagePullIntent(image string) error {
err := os.Remove(filepath.Join(f.pullingDir, cacheFilename(image)))
if os.IsNotExist(err) {
return nil
}
return err
}
func (f *fsPullRecordsAccessor) GetImagePulledRecord(imageRef string) (*kubeletconfiginternal.ImagePulledRecord, bool, error) {
recordBytes, err := os.ReadFile(filepath.Join(f.pulledDir, cacheFilename(imageRef)))
if os.IsNotExist(err) {
return nil, false, nil
} else if err != nil {
return nil, false, err
}
pulledRecord, err := decodePulledRecord(f.decoder, recordBytes)
if err != nil {
return nil, true, err
}
if pulledRecord.ImageRef != imageRef {
return nil, false, nil
}
return pulledRecord, true, err
}
func (f *fsPullRecordsAccessor) ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error) {
var pullRecords []*kubeletconfiginternal.ImagePulledRecord
err := processDirFiles(f.pulledDir,
func(filePath string, fileContent []byte) error {
pullRecord, err := decodePulledRecord(f.decoder, fileContent)
if err != nil {
return fmt.Errorf("failed to deserialize content of file %q into ImagePulledRecord: %w", filePath, err)
}
pullRecords = append(pullRecords, pullRecord)
return nil
})
return pullRecords, err
}
func (f *fsPullRecordsAccessor) WriteImagePulledRecord(pulledRecord *kubeletconfiginternal.ImagePulledRecord) error {
recordBytes := bytes.NewBuffer([]byte{})
if err := f.encoder.Encode(pulledRecord, recordBytes); err != nil {
return fmt.Errorf("failed to serialize ImagePulledRecord: %w", err)
}
return writeFile(f.pulledDir, cacheFilename(pulledRecord.ImageRef), recordBytes.Bytes())
}
func (f *fsPullRecordsAccessor) DeleteImagePulledRecord(imageRef string) error {
err := os.Remove(filepath.Join(f.pulledDir, cacheFilename(imageRef)))
if os.IsNotExist(err) {
return nil
}
return err
}
func cacheFilename(image string) string {
return fmt.Sprintf("%s%x", cacheFilesSHA256Prefix, sha256.Sum256([]byte(image)))
}
// writeFile writes `content` to the file with name `filename` in directory `dir`.
// It assures write atomicity by creating a temporary file first and only after
// a successful write, it move the temp file in place of the target.
func writeFile(dir, filename string, content []byte) error {
// create target folder if it does not exists yet
if err := os.MkdirAll(dir, 0700); err != nil {
return fmt.Errorf("failed to create directory %q: %w", dir, err)
}
targetPath := filepath.Join(dir, filename)
tmpPath := targetPath + tmpFilesSuffix
if err := os.WriteFile(tmpPath, content, 0600); err != nil {
_ = os.Remove(tmpPath) // attempt a delete in case the file was at least partially written
return fmt.Errorf("failed to create temporary file %q: %w", tmpPath, err)
}
if err := os.Rename(tmpPath, targetPath); err != nil {
_ = os.Remove(tmpPath) // attempt a cleanup
return err
}
return nil
}
// processDirFiles reads files in a given directory and peforms `fileAction` action on those.
func processDirFiles(dirName string, fileAction func(filePath string, fileContent []byte) error) error {
var walkErrors []error
err := filepath.WalkDir(dirName, func(path string, d fs.DirEntry, err error) error {
if err != nil {
walkErrors = append(walkErrors, err)
return nil
}
if path == dirName {
return nil
}
if d.IsDir() {
return filepath.SkipDir
}
// skip files we didn't write or .tmp files
if filename := d.Name(); !strings.HasPrefix(filename, cacheFilesSHA256Prefix) || strings.HasSuffix(filename, tmpFilesSuffix) {
return nil
}
fileContent, err := os.ReadFile(path)
if err != nil {
walkErrors = append(walkErrors, fmt.Errorf("failed to read %q: %w", path, err))
return nil
}
if err := fileAction(path, fileContent); err != nil {
walkErrors = append(walkErrors, err)
return nil
}
return nil
})
if err != nil {
walkErrors = append(walkErrors, err)
}
return errors.NewAggregate(walkErrors)
}
// createKubeletCOnfigSchemeEncoderDecoder creates strict-encoding encoder and
// decoder for the internal and alpha kubelet config APIs.
func createKubeletConfigSchemeEncoderDecoder() (runtime.Encoder, runtime.Decoder, error) {
const mediaType = runtime.ContentTypeJSON
scheme := runtime.NewScheme()
if err := kubeletconfigvint1alpha1.AddToScheme(scheme); err != nil {
return nil, nil, err
}
if err := kubeletconfiginternal.AddToScheme(scheme); err != nil {
return nil, nil, err
}
// use the strict scheme to fail on unknown fields
codecs := serializer.NewCodecFactory(scheme, serializer.EnableStrict)
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, nil, fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
}
return codecs.EncoderForVersion(info.Serializer, kubeletconfigv1alpha1.SchemeGroupVersion), codecs.UniversalDecoder(), nil
}
func decodeIntent(d runtime.Decoder, objBytes []byte) (*kubeletconfiginternal.ImagePullIntent, error) {
obj, _, err := d.Decode(objBytes, nil, nil)
if err != nil {
return nil, err
}
intentObj, ok := obj.(*kubeletconfiginternal.ImagePullIntent)
if !ok {
return nil, fmt.Errorf("failed to convert object to *ImagePullIntent: %T", obj)
}
return intentObj, nil
}
func decodePulledRecord(d runtime.Decoder, objBytes []byte) (*kubeletconfiginternal.ImagePulledRecord, error) {
obj, _, err := d.Decode(objBytes, nil, nil)
if err != nil {
return nil, err
}
pulledRecord, ok := obj.(*kubeletconfiginternal.ImagePulledRecord)
if !ok {
return nil, fmt.Errorf("failed to convert object to *ImagePulledRecord: %T", obj)
}
return pulledRecord, nil
}

View File

@ -0,0 +1,508 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"fmt"
"slices"
"strings"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/parsers"
)
var _ ImagePullManager = &PullManager{}
// PullManager is an implementation of the ImagePullManager. It
// tracks images pulled by the kubelet by creating records about ongoing and
// successful pulls.
// It tracks the credentials used with each successful pull in order to be able
// to distinguish tenants requesting access to an image that exists on the kubelet's
// node.
type PullManager struct {
recordsAccessor PullRecordsAccessor
imagePolicyEnforcer ImagePullPolicyEnforcer
imageService kubecontainer.ImageService
intentAccessors *StripedLockSet // image -> sync.Mutex
intentCounters *sync.Map // image -> number of current in-flight pulls
pulledAccessors *StripedLockSet // imageRef -> sync.Mutex
}
func NewImagePullManager(ctx context.Context, recordsAccessor PullRecordsAccessor, imagePullPolicy ImagePullPolicyEnforcer, imageService kubecontainer.ImageService, lockStripesNum int32) (*PullManager, error) {
m := &PullManager{
recordsAccessor: recordsAccessor,
imagePolicyEnforcer: imagePullPolicy,
imageService: imageService,
intentAccessors: NewStripedLockSet(lockStripesNum),
intentCounters: &sync.Map{},
pulledAccessors: NewStripedLockSet(lockStripesNum),
}
m.initialize(ctx)
return m, nil
}
func (f *PullManager) RecordPullIntent(image string) error {
f.intentAccessors.Lock(image)
defer f.intentAccessors.Unlock(image)
if err := f.recordsAccessor.WriteImagePullIntent(image); err != nil {
return fmt.Errorf("failed to record image pull intent: %w", err)
}
f.incrementIntentCounterForImage(image)
return nil
}
func (f *PullManager) RecordImagePulled(image, imageRef string, credentials *kubeletconfiginternal.ImagePullCredentials) {
if err := f.writePulledRecordIfChanged(image, imageRef, credentials); err != nil {
klog.ErrorS(err, "failed to write image pulled record", "imageRef", imageRef)
return
}
// Notice we don't decrement in case of record write error, which leaves dangling
// imagePullIntents and refCount in the intentCounters map.
// This is done so that the successfully pulled image is still considered as pulled by the kubelet.
// The kubelet will attempt to turn the imagePullIntent into a pulled record again when
// it's restarted.
f.decrementImagePullIntent(image)
}
// writePulledRecordIfChanged writes an ImagePulledRecord into the f.pulledDir directory.
// `image` is an image from a container of a Pod object.
// `imageRef` is a reference to the `image“ as used by the CRI.
// `credentials` is a set of credentials that should be written to a new/merged into
// an existing record.
//
// If `credentials` is nil, it marks a situation where an image was pulled under
// unknown circumstances. We should record the image as tracked but no credentials
// should be written in order to force credential verification when the image is
// accessed the next time.
func (f *PullManager) writePulledRecordIfChanged(image, imageRef string, credentials *kubeletconfiginternal.ImagePullCredentials) error {
f.pulledAccessors.Lock(imageRef)
defer f.pulledAccessors.Unlock(imageRef)
sanitizedImage, err := trimImageTagDigest(image)
if err != nil {
return fmt.Errorf("invalid image name %q: %w", image, err)
}
pulledRecord, _, err := f.recordsAccessor.GetImagePulledRecord(imageRef)
if err != nil {
klog.InfoS("failed to retrieve an ImagePulledRecord", "image", image, "err", err)
pulledRecord = nil
}
var pulledRecordChanged bool
if pulledRecord == nil {
pulledRecordChanged = true
pulledRecord = &kubeletconfiginternal.ImagePulledRecord{
LastUpdatedTime: metav1.Time{Time: time.Now()},
ImageRef: imageRef,
CredentialMapping: make(map[string]kubeletconfiginternal.ImagePullCredentials),
}
// just the existence of the pulled record for a given imageRef is enough
// for us to consider it kubelet-pulled. The kubelet should fail safe
// if it does not find a credential record for the specific image, and it
// must require credential validation
if credentials != nil {
pulledRecord.CredentialMapping[sanitizedImage] = *credentials
}
} else {
pulledRecord, pulledRecordChanged = pulledRecordMergeNewCreds(pulledRecord, sanitizedImage, credentials)
}
if !pulledRecordChanged {
return nil
}
return f.recordsAccessor.WriteImagePulledRecord(pulledRecord)
}
func (f *PullManager) RecordImagePullFailed(image string) {
f.decrementImagePullIntent(image)
}
// decrementImagePullIntent decreses the number of how many times image pull
// intent for a given `image` was requested, and removes the ImagePullIntent file
// if the reference counter for the image reaches zero.
func (f *PullManager) decrementImagePullIntent(image string) {
f.intentAccessors.Lock(image)
defer f.intentAccessors.Unlock(image)
if f.getIntentCounterForImage(image) <= 1 {
if err := f.recordsAccessor.DeleteImagePullIntent(image); err != nil {
klog.ErrorS(err, "failed to remove image pull intent", "image", image)
return
}
// only delete the intent counter once the file was deleted to be consistent
// with the records
f.intentCounters.Delete(image)
return
}
f.decrementIntentCounterForImage(image)
}
func (f *PullManager) MustAttemptImagePull(image, imageRef string, podSecrets []kubeletconfiginternal.ImagePullSecret) bool {
if len(imageRef) == 0 {
return true
}
var imagePulledByKubelet bool
var pulledRecord *kubeletconfiginternal.ImagePulledRecord
err := func() error {
// don't allow changes to the files we're using for our decision
f.pulledAccessors.Lock(imageRef)
defer f.pulledAccessors.Unlock(imageRef)
f.intentAccessors.Lock(image)
defer f.intentAccessors.Unlock(image)
var err error
var exists bool
pulledRecord, exists, err = f.recordsAccessor.GetImagePulledRecord(imageRef)
switch {
case err != nil:
return err
case exists:
imagePulledByKubelet = true
case pulledRecord != nil:
imagePulledByKubelet = true
default:
// optimized check - we can check the intent number, however, if it's zero
// it may only mean kubelet restarted since writing the intent record and
// we must fall back to the actual cache
imagePulledByKubelet = f.getIntentCounterForImage(image) > 0
if imagePulledByKubelet {
break
}
if exists, err := f.recordsAccessor.ImagePullIntentExists(image); err != nil {
return fmt.Errorf("failed to check existence of an image pull intent: %w", err)
} else if exists {
imagePulledByKubelet = true
}
}
return nil
}()
if err != nil {
klog.ErrorS(err, "Unable to access cache records about image pulls")
return true
}
if !f.imagePolicyEnforcer.RequireCredentialVerificationForImage(image, imagePulledByKubelet) {
return false
}
if pulledRecord == nil {
// we have no proper records of the image being pulled in the past, we can short-circuit here
return true
}
sanitizedImage, err := trimImageTagDigest(image)
if err != nil {
klog.ErrorS(err, "failed to parse image name, forcing image credentials reverification", "image", sanitizedImage)
return true
}
cachedCreds, ok := pulledRecord.CredentialMapping[sanitizedImage]
if !ok {
return true
}
if cachedCreds.NodePodsAccessible {
// anyone on this node can access the image
return false
}
if len(cachedCreds.KubernetesSecrets) == 0 {
return true
}
for _, podSecret := range podSecrets {
for _, cachedSecret := range cachedCreds.KubernetesSecrets {
// we need to check hash len in case hashing failed while storing the record in the keyring
if len(cachedSecret.CredentialHash) > 0 && podSecret.CredentialHash == cachedSecret.CredentialHash {
// TODO: should we record the new secret in case it has different coordinates? If the secret rotates, we will pull unnecessarily otherwise
return false
}
if podSecret.UID == cachedSecret.UID &&
podSecret.Namespace == cachedSecret.Namespace &&
podSecret.Name == cachedSecret.Name {
// TODO: should we record the new creds in this case so that we don't pull if these are present in a different secret?
return false
}
}
}
return true
}
func (f *PullManager) PruneUnknownRecords(imageList []string, until time.Time) {
// TODO: also cleanup the lock maps for intent/pull records?
panic("implement me")
}
// initialize gathers all the images from pull intent records that exist
// from the previous kubelet runs.
// If the CRI reports any of the above images as already pulled, we turn the
// pull intent into a pulled record and the original pull intent is deleted.
//
// This method is not thread-safe and it should only be called upon the creation
// of the PullManager.
func (f *PullManager) initialize(ctx context.Context) {
pullIntents, err := f.recordsAccessor.ListImagePullIntents()
if err != nil {
klog.ErrorS(err, "there was an error listing ImagePullIntents")
return
}
if len(pullIntents) == 0 {
return
}
imageObjs, err := f.imageService.ListImages(ctx)
if err != nil {
klog.ErrorS(err, "failed to list images")
}
inFlightPulls := sets.New[string]()
for _, intent := range pullIntents {
inFlightPulls.Insert(intent.Image)
}
// Each of the images known to the CRI might consist of multiple tags and digests,
// which is what we track in the ImagePullIntent - we need to go through all of these
// for each image.
for _, imageObj := range imageObjs {
existingRecordedImages := searchForExistingTagDigest(inFlightPulls, imageObj)
for _, image := range existingRecordedImages.UnsortedList() {
if err := f.writePulledRecordIfChanged(image, imageObj.ID, nil); err != nil {
klog.ErrorS(err, "failed to write an image pull record", "imageRef", imageObj.ID)
continue
}
if err := f.recordsAccessor.DeleteImagePullIntent(image); err != nil {
klog.V(2).InfoS("failed to remove image pull intent file", "imageName", image, "error", err)
}
}
}
}
func (f *PullManager) incrementIntentCounterForImage(image string) {
f.intentCounters.Store(image, f.getIntentCounterForImage(image)+1)
}
func (f *PullManager) decrementIntentCounterForImage(image string) {
f.intentCounters.Store(image, f.getIntentCounterForImage(image)-1)
}
func (f *PullManager) getIntentCounterForImage(image string) int32 {
intentNumAny, ok := f.intentCounters.Load(image)
if !ok {
return 0
}
intentNum, ok := intentNumAny.(int32)
if !ok {
panic(fmt.Sprintf("expected the intentCounters sync map to only contain int32 values, got %T", intentNumAny))
}
return intentNum
}
var _ ImagePullManager = &NoopImagePullManager{}
type NoopImagePullManager struct{}
func (m *NoopImagePullManager) RecordPullIntent(_ string) error { return nil }
func (m *NoopImagePullManager) RecordImagePulled(_, _ string, _ *kubeletconfiginternal.ImagePullCredentials) {
}
func (m *NoopImagePullManager) RecordImagePullFailed(image string) {}
func (m *NoopImagePullManager) MustAttemptImagePull(_, _ string, _ []kubeletconfiginternal.ImagePullSecret) bool {
return false
}
func (m *NoopImagePullManager) PruneUnknownRecords(_ []string, _ time.Time) {}
// searchForExistingTagDigest loops through the `image` RepoDigests and RepoTags
// and tries to find all image digests/tags in `inFlightPulls`, which is a map of
// containerImage -> pulling intent path.
func searchForExistingTagDigest(inFlightPulls sets.Set[string], image kubecontainer.Image) sets.Set[string] {
existingRecordedImages := sets.New[string]()
for _, digest := range image.RepoDigests {
if ok := inFlightPulls.Has(digest); ok {
existingRecordedImages.Insert(digest)
}
}
for _, tag := range image.RepoTags {
if ok := inFlightPulls.Has(tag); ok {
existingRecordedImages.Insert(tag)
}
}
return existingRecordedImages
}
type kubeSecretCoordinates struct {
UID string
Namespace string
Name string
}
// pulledRecordMergeNewCreds merges the credentials from `newCreds` into the `orig`
// record for the `imageNoTagDigest` image.
// `imageNoTagDigest` is the content of the `image` field from a pod's container
// after any tag or digest were removed from it.
//
// NOTE: pulledRecordMergeNewCreds() may be often called in the read path of
// PullManager.MustAttemptImagePul() and so it's desirable to limit allocations
// (e.g. DeepCopy()) until it is necessary.
func pulledRecordMergeNewCreds(orig *kubeletconfiginternal.ImagePulledRecord, imageNoTagDigest string, newCreds *kubeletconfiginternal.ImagePullCredentials) (*kubeletconfiginternal.ImagePulledRecord, bool) {
if newCreds == nil {
// no new credential information to record
return orig, false
}
if !newCreds.NodePodsAccessible && len(newCreds.KubernetesSecrets) == 0 {
// we don't have any secret credentials or node-wide access to record
// TODO(stlaz,aramase): add in a serviceaccount dimension check
return orig, false
}
selectedCreds, found := orig.CredentialMapping[imageNoTagDigest]
if !found {
ret := orig.DeepCopy()
if ret.CredentialMapping == nil {
ret.CredentialMapping = make(map[string]kubeletconfiginternal.ImagePullCredentials)
}
ret.CredentialMapping[imageNoTagDigest] = *newCreds
ret.LastUpdatedTime = metav1.Time{Time: time.Now()}
return ret, true
}
if selectedCreds.NodePodsAccessible {
return orig, false
}
if newCreds.NodePodsAccessible {
selectedCreds.NodePodsAccessible = true
selectedCreds.KubernetesSecrets = nil
ret := orig.DeepCopy()
ret.CredentialMapping[imageNoTagDigest] = selectedCreds
ret.LastUpdatedTime = metav1.Time{Time: time.Now()}
return ret, true
}
var secretsChanged bool
selectedCreds.KubernetesSecrets, secretsChanged = mergePullSecrets(selectedCreds.KubernetesSecrets, newCreds.KubernetesSecrets)
if !secretsChanged {
return orig, false
}
ret := orig.DeepCopy()
ret.CredentialMapping[imageNoTagDigest] = selectedCreds
ret.LastUpdatedTime = metav1.Time{Time: time.Now()}
return ret, true
}
// mergePullSecrets merges two slices of ImagePullSecret object into one while
// keeping the objects unique per `Namespace, Name, UID` key.
//
// In case an object from the `new` slice has the same `Namespace, Name, UID` combination
// as an object from `orig`, the result will use the CredentialHash value of the
// object from `new`.
//
// The returned slice is sorted by Namespace, Name and UID (in this order). Also
// returns an indicator whether the set of input secrets chaged.
func mergePullSecrets(orig, new []kubeletconfiginternal.ImagePullSecret) ([]kubeletconfiginternal.ImagePullSecret, bool) {
credSet := make(map[kubeSecretCoordinates]string)
for _, secret := range orig {
credSet[kubeSecretCoordinates{
UID: secret.UID,
Namespace: secret.Namespace,
Name: secret.Name,
}] = secret.CredentialHash
}
changed := false
for _, s := range new {
key := kubeSecretCoordinates{UID: s.UID, Namespace: s.Namespace, Name: s.Name}
if existingHash, ok := credSet[key]; !ok || existingHash != s.CredentialHash {
changed = true
credSet[key] = s.CredentialHash
}
}
if !changed {
return orig, false
}
ret := make([]kubeletconfiginternal.ImagePullSecret, 0, len(credSet))
for coords, hash := range credSet {
ret = append(ret, kubeletconfiginternal.ImagePullSecret{
UID: coords.UID,
Namespace: coords.Namespace,
Name: coords.Name,
CredentialHash: hash,
})
}
// we don't need to use the stable version because secret coordinates used for ordering are unique in the set
slices.SortFunc(ret, imagePullSecretLess)
return ret, true
}
// imagePullSecretLess is a helper function to define ordering in a slice of
// ImagePullSecret objects.
func imagePullSecretLess(a, b kubeletconfiginternal.ImagePullSecret) int {
if cmp := strings.Compare(a.Namespace, b.Namespace); cmp != 0 {
return cmp
}
if cmp := strings.Compare(a.Name, b.Name); cmp != 0 {
return cmp
}
return strings.Compare(a.UID, b.UID)
}
// trimImageTagDigest removes the tag and digest from an image name
func trimImageTagDigest(containerImage string) (string, error) {
imageName, _, _, err := parsers.ParseImageName(containerImage)
return imageName, err
}

View File

@ -0,0 +1,891 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/container"
ctesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func Test_pulledRecordMergeNewCreds(t *testing.T) {
testTime := metav1.Time{Time: time.Date(2022, 2, 22, 12, 00, 00, 00, time.Local)}
testRecord := &kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testImageRef",
LastUpdatedTime: testTime,
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"test-image1": {
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
},
},
"test-image2": {
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
{UID: "uid2", Namespace: "namespace2", Name: "name2", CredentialHash: "hash2"},
},
},
"test-nodewide": {
NodePodsAccessible: true,
},
},
}
tests := []struct {
name string
current *kubeletconfiginternal.ImagePulledRecord
image string
credsForMerging *kubeletconfiginternal.ImagePullCredentials
expectedRecord *kubeletconfiginternal.ImagePulledRecord
wantUpdate bool
}{
{
name: "create a new image record",
image: "new-image",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "newuid", Namespace: "newnamespace", Name: "newname", CredentialHash: "newhash"},
},
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "new-image",
kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "newuid", Namespace: "newnamespace", Name: "newname", CredentialHash: "newhash"},
},
},
),
wantUpdate: true,
},
{
name: "merge with an existing image secret",
image: "test-image1",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "newuid", Namespace: "newnamespace", Name: "newname", CredentialHash: "newhash"},
},
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "test-image1",
kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
{UID: "newuid", Namespace: "newnamespace", Name: "newname", CredentialHash: "newhash"},
},
},
),
wantUpdate: true,
},
{
name: "merge with existing image record secrets",
image: "test-image2",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "newuid", Namespace: "namespace1", Name: "newname", CredentialHash: "newhash"},
},
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "test-image2",
kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
{UID: "newuid", Namespace: "namespace1", Name: "newname", CredentialHash: "newhash"},
{UID: "uid2", Namespace: "namespace2", Name: "name2", CredentialHash: "hash2"},
},
},
),
wantUpdate: true,
},
{
name: "node-accessible overrides all secrets",
image: "test-image2",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
NodePodsAccessible: true,
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "test-image2",
kubeletconfiginternal.ImagePullCredentials{
NodePodsAccessible: true,
},
),
wantUpdate: true,
},
{
name: "new creds have the same secret coordinates but a different hash",
image: "test-image2",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid2", Namespace: "namespace2", Name: "name2", CredentialHash: "newhash"},
},
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "test-image2",
kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
{UID: "uid2", Namespace: "namespace2", Name: "name2", CredentialHash: "newhash"},
},
},
),
wantUpdate: true,
},
{
name: "new creds have the same hash but a different coordinates",
image: "test-image2",
current: testRecord.DeepCopy(),
credsForMerging: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid3", Namespace: "namespace2", Name: "name3", CredentialHash: "hash2"},
},
},
expectedRecord: withImageRecord(testRecord.DeepCopy(), "test-image2",
kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "uid1", Namespace: "namespace1", Name: "name1", CredentialHash: "hash1"},
{UID: "uid2", Namespace: "namespace2", Name: "name2", CredentialHash: "hash2"},
{UID: "uid3", Namespace: "namespace2", Name: "name3", CredentialHash: "hash2"},
},
},
),
wantUpdate: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotRecord, gotUpdate := pulledRecordMergeNewCreds(tt.current, tt.image, tt.credsForMerging)
if gotUpdate != tt.wantUpdate {
t.Errorf("pulledRecordMergeNewCreds() gotUpdate = %v, wantUpdate %v", gotUpdate, tt.wantUpdate)
}
if origTime, newTime := tt.expectedRecord.LastUpdatedTime, gotRecord.LastUpdatedTime; tt.wantUpdate && !origTime.Before(&newTime) {
t.Errorf("expected the new update time to be after the original update time: %v > %v", origTime, newTime)
}
// make the new update time equal to the expected time for the below comparison now
gotRecord.LastUpdatedTime = tt.expectedRecord.LastUpdatedTime
if !reflect.DeepEqual(gotRecord, tt.expectedRecord) {
t.Errorf("pulledRecordMergeNewCreds() difference between got/expected: %v", cmp.Diff(tt.expectedRecord, gotRecord))
}
})
}
}
func TestFileBasedImagePullManager_MustAttemptImagePull(t *testing.T) {
tests := []struct {
name string
imagePullPolicy ImagePullPolicyEnforcer
podSecrets []kubeletconfiginternal.ImagePullSecret
image string
imageRef string
pulledFiles []string
pullingFiles []string
want bool
}{
{
name: "image exists and is recorded with pod's exact secret",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: false,
},
{
name: "image exists and is recorded, no pod secrets",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: true,
},
{
name: "image exists and is recorded with the same secret but different credential hash",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "differenthash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: false,
},
{
name: "image exists and is recorded with a different secret with a different UID",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "different uid", Namespace: "default", Name: "pull-secret", CredentialHash: "differenthash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: true,
},
{
name: "image exists and is recorded with a different secret",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "differentns", Name: "pull-secret", CredentialHash: "differenthash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: true,
},
{
name: "image exists and is recorded with a different secret with the same credential hash",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "differentns", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: false,
},
{
name: "image exists but the pull is recorded with a different image name but with the exact same secret",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/different:latest",
imageRef: "testimageref",
pulledFiles: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
want: true,
},
{
name: "image exists and is recorded with empty credential mapping",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "testemptycredmapping",
pulledFiles: []string{"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991"},
want: true,
},
{
name: "image does not exist and there are no records of it",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "",
want: true,
},
{
name: "image exists and there are no records of it with NeverVerifyPreloadedImages pull policy",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "testexistingref",
want: false,
},
{
name: "image exists and there are no records of it with AlwaysVerify pull policy",
imagePullPolicy: AlwaysVerifyImagePullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "testexistingref",
want: true,
},
{
name: "image exists but is only recorded via pulling intent",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testexistingref",
pullingFiles: []string{"sha256-aef2af226629a35d5f3ef0fdbb29fdbebf038d0acd8850590e8c48e1e283aa56"},
want: true,
},
{
name: "image exists but is only recorded via pulling intent - NeverVerify policy",
imagePullPolicy: NeverVerifyImagePullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testexistingref",
pullingFiles: []string{"sha256-aef2af226629a35d5f3ef0fdbb29fdbebf038d0acd8850590e8c48e1e283aa56"},
want: false,
},
{
name: "image exists and is recorded as node-accessible, no pod secrets",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
image: "docker.io/testing/test:latest",
imageRef: "testimage-anonpull",
pulledFiles: []string{"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a"},
want: false,
},
{
name: "image exists and is recorded as node-accessible, request with pod secrets",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash",
},
},
image: "docker.io/testing/test:latest",
imageRef: "testimage-anonpull",
pulledFiles: []string{"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a"},
want: false,
},
{
name: "image exists and is recorded with empty hash as its hashing originally failed, the same fail for a different pod secret",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "differentns", Name: "pull-secret", CredentialHash: "",
},
},
image: "docker.io/testing/test:latest",
imageRef: "test-brokenhash",
pulledFiles: []string{"sha256-38a8906435c4dd5f4258899d46621bfd8eea3ad6ff494ee3c2f17ef0321625bd"},
want: true,
},
{
name: "image exists and is recorded with empty hash as its hashing originally failed, the same fail for the same pod secret",
imagePullPolicy: NeverVerifyPreloadedPullPolicy(),
podSecrets: []kubeletconfiginternal.ImagePullSecret{
{
UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "",
},
},
image: "docker.io/testing/test:latest",
imageRef: "test-brokenhash",
pulledFiles: []string{"sha256-38a8906435c4dd5f4258899d46621bfd8eea3ad6ff494ee3c2f17ef0321625bd"},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder()
require.NoError(t, err)
testDir := t.TempDir()
pullingDir := filepath.Join(testDir, "pulling")
pulledDir := filepath.Join(testDir, "pulled")
copyTestData(t, pullingDir, "pulling", tt.pullingFiles)
copyTestData(t, pulledDir, "pulled", tt.pulledFiles)
fsRecordAccessor := &fsPullRecordsAccessor{
pullingDir: pullingDir,
pulledDir: pulledDir,
encoder: encoder,
decoder: decoder,
}
f := &PullManager{
recordsAccessor: fsRecordAccessor,
imagePolicyEnforcer: tt.imagePullPolicy,
intentAccessors: NewStripedLockSet(10),
intentCounters: &sync.Map{},
pulledAccessors: NewStripedLockSet(10),
}
if got := f.MustAttemptImagePull(tt.image, tt.imageRef, tt.podSecrets); got != tt.want {
t.Errorf("FileBasedImagePullManager.MustAttemptImagePull() = %v, want %v", got, tt.want)
}
})
}
}
func TestFileBasedImagePullManager_RecordPullIntent(t *testing.T) {
tests := []struct {
name string
inputImage string
wantFile string
startCounter int32
wantCounter int32
}{
{
name: "first pull",
inputImage: "repo.repo/test/test:latest",
wantFile: "sha256-7d8c031e2f1aeaa71649ca3e0b64c9902370ed460ef57fb07582a87a5a1e1c02",
wantCounter: 1,
},
{
name: "first pull",
inputImage: "repo.repo/test/test:latest",
wantFile: "sha256-7d8c031e2f1aeaa71649ca3e0b64c9902370ed460ef57fb07582a87a5a1e1c02",
startCounter: 1,
wantCounter: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder()
require.NoError(t, err)
testDir := t.TempDir()
pullingDir := filepath.Join(testDir, "pulling")
fsRecordAccessor := &fsPullRecordsAccessor{
pullingDir: pullingDir,
encoder: encoder,
decoder: decoder,
}
f := &PullManager{
recordsAccessor: fsRecordAccessor,
intentAccessors: NewStripedLockSet(10),
intentCounters: &sync.Map{},
}
if tt.startCounter > 0 {
f.intentCounters.Store(tt.inputImage, tt.startCounter)
}
_ = f.RecordPullIntent(tt.inputImage)
expectFilename := filepath.Join(pullingDir, tt.wantFile)
require.FileExists(t, expectFilename)
require.Equal(t, tt.wantCounter, f.getIntentCounterForImage(tt.inputImage), "pull intent counter does not match")
expected := kubeletconfiginternal.ImagePullIntent{
Image: tt.inputImage,
}
gotBytes, err := os.ReadFile(expectFilename)
if err != nil {
t.Fatalf("failed to read the expected file: %v", err)
}
var got kubeletconfiginternal.ImagePullIntent
if _, _, err := decoder.Decode(gotBytes, nil, &got); err != nil {
t.Fatalf("failed to unmarshal the created file data: %v", err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected ImagePullIntent != got; diff: %s", cmp.Diff(expected, got))
}
})
}
}
func TestFileBasedImagePullManager_RecordImagePulled(t *testing.T) {
tests := []struct {
name string
image string
imageRef string
creds *kubeletconfiginternal.ImagePullCredentials
pullsInFlight int32
existingPulling []string
existingPulled []string
expectPullingRemoved string
expectPulled []string
checkedPullFile string
expectedPullRecord kubeletconfiginternal.ImagePulledRecord
expectUpdated bool
}{
{
name: "new pull record",
image: "repo.repo/test/test:v1",
imageRef: "testimageref",
creds: &kubeletconfiginternal.ImagePullCredentials{NodePodsAccessible: true},
expectPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
existingPulling: []string{"sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11"},
pullsInFlight: 1,
expectPullingRemoved: "sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11",
checkedPullFile: "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
expectUpdated: true,
expectedPullRecord: kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testimageref",
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"repo.repo/test/test": {
NodePodsAccessible: true,
},
},
},
},
{
name: "new pull record, more puls in-flight",
image: "repo.repo/test/test:v1",
imageRef: "testimageref",
creds: &kubeletconfiginternal.ImagePullCredentials{NodePodsAccessible: true},
expectPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
existingPulling: []string{"sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11"},
pullsInFlight: 2,
checkedPullFile: "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
expectUpdated: true,
expectedPullRecord: kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testimageref",
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"repo.repo/test/test": {
NodePodsAccessible: true,
},
},
},
},
{
name: "merge into existing record",
image: "repo.repo/test/test:v1",
imageRef: "testimageref",
creds: &kubeletconfiginternal.ImagePullCredentials{NodePodsAccessible: true},
existingPulled: []string{
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
expectPulled: []string{
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
existingPulling: []string{"sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11"},
pullsInFlight: 1,
expectPullingRemoved: "sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11",
checkedPullFile: "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
expectUpdated: true,
expectedPullRecord: kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testimageref",
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"repo.repo/test/test": {
NodePodsAccessible: true,
},
"docker.io/testing/test": {
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash"},
},
},
},
},
},
{
name: "merge into existing record - existing key in creds mapping",
image: "docker.io/testing/test:something",
imageRef: "testimageref",
creds: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{{UID: "newuid", Namespace: "newns", Name: "newname", CredentialHash: "somehash"}},
},
existingPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
pullsInFlight: 1,
expectPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
existingPulling: []string{"sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af"},
expectPullingRemoved: "sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af",
checkedPullFile: "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
expectUpdated: true,
expectedPullRecord: kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testimageref",
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"docker.io/testing/test": {
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash"},
{UID: "newuid", Namespace: "newns", Name: "newname", CredentialHash: "somehash"},
},
},
},
},
},
{
name: "existing record stays unchanged",
image: "docker.io/testing/test:something",
imageRef: "testimageref",
creds: &kubeletconfiginternal.ImagePullCredentials{
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{{UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash"}},
},
existingPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
expectPulled: []string{"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064"},
existingPulling: []string{"sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af"},
pullsInFlight: 1,
expectPullingRemoved: "sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af",
checkedPullFile: "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
expectUpdated: false,
expectedPullRecord: kubeletconfiginternal.ImagePulledRecord{
ImageRef: "testimageref",
CredentialMapping: map[string]kubeletconfiginternal.ImagePullCredentials{
"docker.io/testing/test": {
KubernetesSecrets: []kubeletconfiginternal.ImagePullSecret{
{UID: "testsecretuid", Namespace: "default", Name: "pull-secret", CredentialHash: "testsecrethash"},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder()
require.NoError(t, err)
testDir := t.TempDir()
pullingDir := filepath.Join(testDir, "pulling")
pulledDir := filepath.Join(testDir, "pulled")
copyTestData(t, pullingDir, "pulling", tt.existingPulling)
copyTestData(t, pulledDir, "pulled", tt.existingPulled)
fsRecordAccessor := &fsPullRecordsAccessor{
pullingDir: pullingDir,
pulledDir: pulledDir,
encoder: encoder,
decoder: decoder,
}
f := &PullManager{
recordsAccessor: fsRecordAccessor,
intentAccessors: NewStripedLockSet(10),
intentCounters: &sync.Map{},
pulledAccessors: NewStripedLockSet(10),
}
f.intentCounters.Store(tt.image, tt.pullsInFlight)
origIntentCounter := f.getIntentCounterForImage(tt.image)
f.RecordImagePulled(tt.image, tt.imageRef, tt.creds)
require.Equal(t, f.getIntentCounterForImage(tt.image), origIntentCounter-1, "intent counter for %s was not decremented", tt.image)
for _, fname := range tt.expectPulled {
expectFilename := filepath.Join(pulledDir, fname)
require.FileExists(t, expectFilename)
}
if len(tt.expectPullingRemoved) > 0 {
dontExpectFilename := filepath.Join(pullingDir, tt.expectPullingRemoved)
require.NoFileExists(t, dontExpectFilename)
}
pulledBytes, err := os.ReadFile(filepath.Join(pulledDir, tt.checkedPullFile))
if err != nil {
t.Fatalf("failed to read the expected image pulled record: %v", err)
}
got, err := decodePulledRecord(decoder, pulledBytes)
if err != nil {
t.Fatalf("failed to deserialize the image pulled record: %v", err)
}
if tt.expectUpdated {
require.True(t, got.LastUpdatedTime.After(time.Now().Add(-1*time.Minute)), "expected the record to be updated but it didn't - last update time %s", got.LastUpdatedTime.String())
} else {
require.True(t, got.LastUpdatedTime.Before(&metav1.Time{Time: time.Now().Add(-240 * time.Minute)}), "expected the record to NOT be updated but it was - last update time %s", got.LastUpdatedTime.String())
}
got.LastUpdatedTime = tt.expectedPullRecord.LastUpdatedTime
if !reflect.DeepEqual(got, &tt.expectedPullRecord) {
t.Errorf("expected ImagePulledRecord != got; diff: %s", cmp.Diff(tt.expectedPullRecord, got))
}
})
}
}
func TestFileBasedImagePullManager_initialize(t *testing.T) {
imageService := &ctesting.FakeRuntime{
ImageList: []container.Image{
{
ID: "testimageref1",
RepoTags: []string{"repo.repo/test/test:docker", "docker.io/testing/test:something"},
},
{
ID: "testimageref2",
RepoTags: []string{"repo.repo/test/test:v2", "repo.repo/test/test:test2"},
RepoDigests: []string{"repo.repo/test/test@dgst2"},
},
{
ID: "testimageref",
RepoTags: []string{"repo.repo/test/test:v1", "repo.repo/test/test:test1"},
RepoDigests: []string{"repo.repo/test/test@dgst1"},
},
{
ID: "testimageref3",
RepoTags: []string{"repo.repo/test/test:v3", "repo.repo/test/test:test3"},
},
{
ID: "testimageref4",
RepoDigests: []string{"repo.repo/test/test@dgst4", "repo.repo/test/notatest@dgst44"},
},
},
}
tests := []struct {
name string
existingIntents []string
existingPulledRecords []string
expectedIntents sets.Set[string]
expectedPulled sets.Set[string]
}{
{
name: "no pulling/pulled records",
},
{
name: "only pulled records",
existingPulledRecords: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
expectedPulled: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991"),
},
{
name: "pulling intent that matches an existing image - no matching pulled record",
existingPulledRecords: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
existingIntents: []string{
"sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af",
},
expectedPulled: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
"sha256-d77ed7480bc819274ea7a4dba5b2699b2d3f73c6e578762df42e5a8224771096",
),
},
{
name: "pulling intent that matches an existing image - a pull record matches",
existingPulledRecords: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
existingIntents: []string{
"sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11",
},
expectedPulled: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
),
},
{
name: "multiple pulling intents that match existing images",
existingPulledRecords: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
existingIntents: []string{
"sha256-ee81caca15454863449fb55a1d942904d56d5ed9f9b20a7cb3453944ea2c7e11",
"sha256-f24acc752be18b93b0504c86312bbaf482c9efb0c45e925bbccb0a591cebd7af",
},
expectedPulled: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
"sha256-d77ed7480bc819274ea7a4dba5b2699b2d3f73c6e578762df42e5a8224771096",
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testCtx := ktesting.Init(t)
encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder()
require.NoError(t, err)
testDir := t.TempDir()
pullingDir := filepath.Join(testDir, "pulling")
pulledDir := filepath.Join(testDir, "pulled")
if err := os.MkdirAll(pullingDir, 0700); err != nil {
t.Fatal(err)
}
if err := os.MkdirAll(pulledDir, 0700); err != nil {
t.Fatal(err)
}
copyTestData(t, pullingDir, "pulling", tt.existingIntents)
copyTestData(t, pulledDir, "pulled", tt.existingPulledRecords)
fsRecordAccessor := &fsPullRecordsAccessor{
pullingDir: pullingDir,
pulledDir: pulledDir,
encoder: encoder,
decoder: decoder,
}
f := &PullManager{
recordsAccessor: fsRecordAccessor,
imageService: imageService,
intentAccessors: NewStripedLockSet(10),
intentCounters: &sync.Map{},
pulledAccessors: NewStripedLockSet(10),
}
f.initialize(testCtx)
gotIntents := sets.New[string]()
if err := processDirFiles(pullingDir, func(filePath string, fileContent []byte) error {
gotIntents.Insert(filepath.Base(filePath))
return nil
}); err != nil {
t.Fatalf("there was an error processing file in the test output pulling dir: %v", err)
}
gotPulled := sets.New[string]()
if err := processDirFiles(pulledDir, func(filePath string, fileContent []byte) error {
gotPulled.Insert(filepath.Base(filePath))
return nil
}); err != nil {
t.Fatalf("there was an error processing file in the test output pulled dir: %v", err)
}
if !gotIntents.Equal(tt.expectedIntents) {
t.Errorf("difference between expected and received pull intent files: %v", cmp.Diff(tt.expectedIntents, gotIntents))
}
if !gotPulled.Equal(tt.expectedPulled) {
t.Errorf("difference between expected and received pull record files: %v", cmp.Diff(tt.expectedPulled, gotPulled))
}
})
}
}
func copyTestData(t *testing.T, dstDir string, testdataDir string, src []string) {
for _, f := range src {
testBytes, err := os.ReadFile(filepath.Join("testdata", testdataDir, f))
if err != nil {
t.Fatalf("failed to read test data: %v", err)
}
if err := writeFile(dstDir, f, testBytes); err != nil {
t.Fatalf("failed to write test data: %v", err)
}
}
}
func withImageRecord(r *kubeletconfiginternal.ImagePulledRecord, image string, record kubeletconfiginternal.ImagePullCredentials) *kubeletconfiginternal.ImagePulledRecord {
r.CredentialMapping[image] = record
return r
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"hash/fnv"
"sync"
)
// StripedLockSet allows context locking based on string keys, where each key
// is mapped to a an index in a size-limited slice of locks.
type StripedLockSet struct {
locks []sync.Mutex
size int32
}
// NewStripedLockSet creates a StripedLockSet with `size` number of locks to be
// used for locking context based on string keys.
// The size will be normalized to stay in the <1, 31> interval.
func NewStripedLockSet(size int32) *StripedLockSet {
size = max(size, 1) // make sure we're at least at size 1
return &StripedLockSet{
locks: make([]sync.Mutex, min(31, size)),
size: size,
}
}
func (s *StripedLockSet) Lock(key string) {
s.locks[keyToID(key, s.size)].Lock()
}
func (s *StripedLockSet) Unlock(key string) {
s.locks[keyToID(key, s.size)].Unlock()
}
func (s *StripedLockSet) GlobalLock() {
for i := range s.locks {
s.locks[i].Lock()
}
}
func (s *StripedLockSet) GlobalUnlock() {
for i := range s.locks {
s.locks[i].Unlock()
}
}
func keyToID(key string, sliceSize int32) uint32 {
h := fnv.New32()
h.Write([]byte(key))
return h.Sum32() % uint32(sliceSize)
}

View File

@ -0,0 +1 @@
{"kind":"ImagePulledRecord","apiVersion":"kubelet.config.k8s.io/v1alpha1","lastUpdatedTime":"2024-10-21T12:26:40Z","imageRef":"test-brokenhash","credentialMapping":{"docker.io/testing/test":{"kubernetesSecrets":[{"uid":"testsecretuid","namespace":"default","name":"pull-secret","credentialHash":""}]}}}

View File

@ -0,0 +1 @@
{"kind":"ImagePulledRecord","apiVersion":"kubelet.config.k8s.io/v1alpha1","lastUpdatedTime":"2024-10-21T12:26:40Z","imageRef":"testimage-anonpull","credentialMapping":{"docker.io/testing/test":{"nodePodsAccessible":true}}}

View File

@ -0,0 +1 @@
{"kind":"ImagePulledRecord","apiVersion":"kubelet.config.k8s.io/v1alpha1","lastUpdatedTime":"2024-10-21T12:26:40Z","imageRef":"testimageref","credentialMapping":{"docker.io/testing/test":{"kubernetesSecrets":[{"uid":"testsecretuid","namespace":"default","name":"pull-secret","credentialHash":"testsecrethash"}]}}}

View File

@ -0,0 +1 @@
{"kind":"ImagePulledRecord","apiVersion":"kubelet.config.k8s.io/v1alpha1","lastUpdatedTime":"2024-10-21T12:26:40Z","imageRef":"testemptycredmapping"}

View File

@ -0,0 +1 @@
{"kind":"ImagePullIntent","apiVersion":"kubelet.config.k8s.io/v1alpha1","image":"docker.io/testing/test:latest"}

View File

@ -0,0 +1 @@
{"kind":"ImagePullIntent","apiVersion":"kubelet.config.k8s.io/v1alpha1","image":"repo.repo/test/test:v1"}

View File

@ -0,0 +1 @@
{"kind":"ImagePullIntent","apiVersion":"kubelet.config.k8s.io/v1alpha1","image":"docker.io/testing/test:something"}

View File

@ -103,3 +103,41 @@ type ImagePullManager interface {
// `until` is a timestamp created _before_ the `imageList` was requested from the CRI.
PruneUnknownRecords(imageList []string, until time.Time)
}
// PullRecordsAccessor allows unified access to ImagePullIntents/ImagePulledRecords
// irregardless of the backing database implementation
type PullRecordsAccessor interface {
// ListImagePullIntents lists all the ImagePullIntents in the database.
// ImagePullIntents that cannot be decoded will not appear in the list.
// Returns nil and an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error)
// ImagePullIntentExists returns whether a valid ImagePullIntent is present
// for the given image.
ImagePullIntentExists(image string) (bool, error)
// WriteImagePullIntent writes a an intent record for the image into the database
WriteImagePullIntent(image string) error
// DeleteImagePullIntent removes an `image` intent record from the database
DeleteImagePullIntent(image string) error
// ListImagePulledRecords lists the database ImagePulledRecords.
// Records that cannot be decoded will be ignored.
// Returns an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error)
// GetImagePulledRecord fetches an ImagePulledRecord for the given `imageRef`.
// If a file for the `imageRef` is present but the contents cannot be decoded,
// it returns a exists=true with err equal to the decoding error.
GetImagePulledRecord(imageRef string) (record *kubeletconfiginternal.ImagePulledRecord, exists bool, err error)
// WriteImagePulledRecord writes an ImagePulledRecord into the database.
WriteImagePulledRecord(record *kubeletconfiginternal.ImagePulledRecord) error
// DeleteImagePulledRecord removes an ImagePulledRecord for `imageRef` from the
// database.
DeleteImagePulledRecord(imageRef string) error
}