kubelet: GC for image pull managers

This commit is contained in:
Stanislav Láznička 2024-10-23 14:08:56 +02:00
parent 3793becbb9
commit b8fc6042ca
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
7 changed files with 185 additions and 26 deletions

View File

@ -57,6 +57,11 @@ const (
ImageGarbageCollectedTotalReasonSpace = "space"
)
// PostImageGCHook allows external sources to react to GC collect events.
// `remainingImages` is a list of images that were left on the system after garbage
// collection finished.
type PostImageGCHook func(remainingImages []string, gcStart time.Time)
// StatsProvider is an interface for fetching stats used during image garbage
// collection.
type StatsProvider interface {
@ -128,6 +133,8 @@ type realImageGCManager struct {
// imageCache is the cache of latest image list.
imageCache imageCache
postGCHooks []PostImageGCHook
// tracer for recording spans
tracer trace.Tracer
}
@ -181,7 +188,7 @@ type imageRecord struct {
}
// NewImageGCManager instantiates a new ImageGCManager object.
func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, tracerProvider trace.TracerProvider) (ImageGCManager, error) {
func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, postGCHooks []PostImageGCHook, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, tracerProvider trace.TracerProvider) (ImageGCManager, error) {
// Validate policy.
if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 {
return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent)
@ -200,6 +207,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r
statsProvider: statsProvider,
recorder: recorder,
nodeRef: nodeRef,
postGCHooks: postGCHooks,
tracer: tracer,
}
@ -381,11 +389,13 @@ func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.T
if usagePercent >= im.policy.HighThresholdPercent {
amountToFree := capacity*int64(100-im.policy.LowThresholdPercent)/100 - available
klog.InfoS("Disk usage on image filesystem is over the high threshold, trying to free bytes down to the low threshold", "usage", usagePercent, "highThreshold", im.policy.HighThresholdPercent, "amountToFree", amountToFree, "lowThreshold", im.policy.LowThresholdPercent)
freed, err := im.freeSpace(ctx, amountToFree, freeTime, images)
remainingImages, freed, err := im.freeSpace(ctx, amountToFree, freeTime, images)
if err != nil {
return err
}
im.runPostGCHooks(remainingImages, freeTime)
if freed < amountToFree {
err := fmt.Errorf("Failed to garbage collect required amount of images. Attempted to free %d bytes, but only found %d bytes eligible to free.", amountToFree, freed)
im.recorder.Eventf(im.nodeRef, v1.EventTypeWarning, events.FreeDiskSpaceFailed, err.Error())
@ -396,6 +406,12 @@ func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.T
return nil
}
func (im *realImageGCManager) runPostGCHooks(remainingImages []string, gcStartTime time.Time) {
for _, h := range im.postGCHooks {
h(remainingImages, gcStartTime)
}
}
func (im *realImageGCManager) freeOldImages(ctx context.Context, images []evictionInfo, freeTime, beganGC time.Time) ([]evictionInfo, error) {
if im.policy.MaxAge == 0 {
return images, nil
@ -430,29 +446,38 @@ func (im *realImageGCManager) freeOldImages(ctx context.Context, images []evicti
func (im *realImageGCManager) DeleteUnusedImages(ctx context.Context) error {
klog.InfoS("Attempting to delete unused images")
freeTime := time.Now()
images, err := im.imagesInEvictionOrder(ctx, freeTime)
if err != nil {
return err
}
_, err = im.freeSpace(ctx, math.MaxInt64, freeTime, images)
return err
remainingImages, _, err := im.freeSpace(ctx, math.MaxInt64, freeTime, images)
if err != nil {
return err
}
im.runPostGCHooks(remainingImages, freeTime)
return nil
}
// Tries to free bytesToFree worth of images on the disk.
//
// Returns the number of bytes free and an error if any occurred. The number of
// bytes freed is always returned.
// Returns the images that are still available after the cleanup, the number of bytes freed
// and an error if any occurred. The number of bytes freed is always returned.
// Note that error may be nil and the number of bytes free may be less
// than bytesToFree.
func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, freeTime time.Time, images []evictionInfo) (int64, error) {
func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, freeTime time.Time, images []evictionInfo) ([]string, int64, error) {
// Delete unused images until we've freed up enough space.
var deletionErrors []error
spaceFreed := int64(0)
var imagesLeft []string
for _, image := range images {
klog.V(5).InfoS("Evaluating image ID for possible garbage collection based on disk usage", "imageID", image.id, "runtimeHandler", image.imageRecord.runtimeHandlerUsedToPullImage)
// Images that are currently in used were given a newer lastUsed.
if image.lastUsed.Equal(freeTime) || image.lastUsed.After(freeTime) {
klog.V(5).InfoS("Image ID was used too recently, not eligible for garbage collection", "imageID", image.id, "lastUsed", image.lastUsed, "freeTime", freeTime)
imagesLeft = append(imagesLeft, image.id)
continue
}
@ -460,11 +485,13 @@ func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64,
// In such a case, the image may have just been pulled down, and will be used by a container right away.
if freeTime.Sub(image.firstDetected) < im.policy.MinAge {
klog.V(5).InfoS("Image ID's age is less than the policy's minAge, not eligible for garbage collection", "imageID", image.id, "age", freeTime.Sub(image.firstDetected), "minAge", im.policy.MinAge)
imagesLeft = append(imagesLeft, image.id)
continue
}
if err := im.freeImage(ctx, image, ImageGarbageCollectedTotalReasonSpace); err != nil {
deletionErrors = append(deletionErrors, err)
imagesLeft = append(imagesLeft, image.id)
continue
}
spaceFreed += image.size
@ -475,9 +502,9 @@ func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64,
}
if len(deletionErrors) > 0 {
return spaceFreed, fmt.Errorf("wanted to free %d bytes, but freed %d bytes space with errors in image deletion: %v", bytesToFree, spaceFreed, errors.NewAggregate(deletionErrors))
return nil, spaceFreed, fmt.Errorf("wanted to free %d bytes, but freed %d bytes space with errors in image deletion: %w", bytesToFree, spaceFreed, errors.NewAggregate(deletionErrors))
}
return spaceFreed, nil
return imagesLeft, spaceFreed, nil
}
func (im *realImageGCManager) freeImage(ctx context.Context, image evictionInfo, reason string) error {

View File

@ -740,7 +740,7 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) {
func getImagesAndFreeSpace(ctx context.Context, t *testing.T, assert *assert.Assertions, im *realImageGCManager, fakeRuntime *containertest.FakeRuntime, spaceToFree, expectedSpaceFreed int64, imagesLen int, freeTime time.Time) {
images, err := im.imagesInEvictionOrder(ctx, freeTime)
require.NoError(t, err)
spaceFreed, err := im.freeSpace(ctx, spaceToFree, freeTime, images)
_, spaceFreed, err := im.freeSpace(ctx, spaceToFree, freeTime, images)
require.NoError(t, err)
assert.EqualValues(expectedSpaceFreed, spaceFreed)
assert.Len(fakeRuntime.ImageList, imagesLen)
@ -910,7 +910,7 @@ func TestValidateImageGCPolicy(t *testing.T) {
}
for _, tc := range testCases {
if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, noopoteltrace.NewTracerProvider()); err != nil {
if _, err := NewImageGCManager(nil, nil, nil, nil, nil, tc.imageGCPolicy, noopoteltrace.NewTracerProvider()); err != nil {
if err.Error() != tc.expectErr {
t.Errorf("[%s:]Expected err:%v, but got:%v", tc.name, tc.expectErr, err.Error())
}

View File

@ -274,8 +274,30 @@ func (f *PullManager) MustAttemptImagePull(image, imageRef string, podSecrets []
}
func (f *PullManager) PruneUnknownRecords(imageList []string, until time.Time) {
// TODO: also cleanup the lock maps for intent/pull records?
panic("implement me")
f.pulledAccessors.GlobalLock()
defer f.pulledAccessors.GlobalUnlock()
pulledRecords, err := f.recordsAccessor.ListImagePulledRecords()
if err != nil {
klog.ErrorS(err, "there were errors listing ImagePulledRecords, garbage collection will proceed with incomplete records list")
}
imagesInUse := sets.New(imageList...)
for _, imageRecord := range pulledRecords {
if !imageRecord.LastUpdatedTime.Time.Before(until) {
// the image record was only updated after the GC started
continue
}
if imagesInUse.Has(imageRecord.ImageRef) {
continue
}
if err := f.recordsAccessor.DeleteImagePulledRecord(imageRecord.ImageRef); err != nil {
klog.ErrorS(err, "failed to remove an ImagePulledRecord", "imageRef", imageRecord.ImageRef)
}
}
}
// initialize gathers all the images from pull intent records that exist
@ -288,8 +310,7 @@ func (f *PullManager) PruneUnknownRecords(imageList []string, until time.Time) {
func (f *PullManager) initialize(ctx context.Context) {
pullIntents, err := f.recordsAccessor.ListImagePullIntents()
if err != nil {
klog.ErrorS(err, "there was an error listing ImagePullIntents")
return
klog.ErrorS(err, "there were errors listing ImagePullIntents, continuing with an incomplete records list")
}
if len(pullIntents) == 0 {

View File

@ -17,6 +17,7 @@ limitations under the License.
package images
import (
"io/fs"
"os"
"path/filepath"
"reflect"
@ -873,6 +874,113 @@ func TestFileBasedImagePullManager_initialize(t *testing.T) {
}
}
func TestFileBasedImagePullManager_PruneUnknownRecords(t *testing.T) {
tests := []struct {
name string
imageList []string
gcStartTime time.Time
pulledFiles []string
wantFiles sets.Set[string]
}{
{
name: "all images present",
imageList: []string{"testimage-anonpull", "testimageref", "testemptycredmapping"},
gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC),
pulledFiles: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
wantFiles: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
),
},
{
name: "remove all records on empty list from the GC",
imageList: []string{},
gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC),
pulledFiles: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
},
{
name: "remove all records on list of untracked images from the GC",
imageList: []string{"untracked1", "different-untracked"},
gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC),
pulledFiles: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
},
{
name: "remove records without a match in the image list from the GC",
imageList: []string{"testimage-anonpull", "untracked1", "testimageref", "different-untracked"},
gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC),
pulledFiles: []string{
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
"sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991",
},
wantFiles: sets.New(
"sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a",
"sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064",
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder()
require.NoError(t, err)
testDir := t.TempDir()
pulledDir := filepath.Join(testDir, "pulled")
if err := os.MkdirAll(pulledDir, 0700); err != nil {
t.Fatalf("failed to create testing dir %q: %v", pulledDir, err)
}
copyTestData(t, pulledDir, "pulled", tt.pulledFiles)
fsRecordAccessor := &fsPullRecordsAccessor{
pulledDir: pulledDir,
encoder: encoder,
decoder: decoder,
}
f := &PullManager{
recordsAccessor: fsRecordAccessor,
pulledAccessors: NewStripedLockSet(10),
}
f.PruneUnknownRecords(tt.imageList, tt.gcStartTime)
filesLeft := sets.New[string]()
err = filepath.Walk(pulledDir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if path == pulledDir {
return nil
}
filesLeft.Insert(info.Name())
return nil
})
if err != nil {
t.Fatalf("failed to walk the pull dir after prune: %v", err)
}
if !tt.wantFiles.Equal(filesLeft) {
t.Errorf("expected equal sets, diff: %s", cmp.Diff(tt.wantFiles, filesLeft))
}
})
}
}
func copyTestData(t *testing.T, dstDir string, testdataDir string, src []string) {
for _, f := range src {
testBytes, err := os.ReadFile(filepath.Join("testdata", testdataDir, f))

View File

@ -759,7 +759,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
runtime, postImageGCHooks, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
@ -883,7 +883,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, max(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, kubeDeps.TracerProvider)
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, postImageGCHooks, kubeDeps.Recorder, nodeRef, imageGCPolicy, kubeDeps.TracerProvider)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}

View File

@ -322,7 +322,7 @@ func newTestKubeletWithImageList(
HighThresholdPercent: 90,
LowThresholdPercent: 80,
}
imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, noopoteltrace.NewTracerProvider())
imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, nil, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, noopoteltrace.NewTracerProvider())
assert.NoError(t, err)
kubelet.imageManager = &fakeImageGCManager{
fakeImageService: fakeRuntime,
@ -3394,7 +3394,7 @@ func TestSyncPodSpans(t *testing.T) {
imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp, &logger)
assert.NoError(t, err)
kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
kubelet.containerRuntime, _, err = kuberuntime.NewKubeGenericRuntimeManager(
kubelet.recorder,
kubelet.livenessManager,
kubelet.readinessManager,

View File

@ -227,7 +227,7 @@ func NewKubeGenericRuntimeManager(
tracerProvider trace.TracerProvider,
tokenManager *token.Manager,
getServiceAccount plugin.GetServiceAccountFunc,
) (KubeGenericRuntime, error) {
) (KubeGenericRuntime, []images.PostImageGCHook, error) {
ctx := context.Background()
runtimeService = newInstrumentedRuntimeService(runtimeService)
imageService = newInstrumentedImageManagerService(imageService)
@ -262,7 +262,7 @@ func NewKubeGenericRuntimeManager(
typedVersion, err := kubeRuntimeManager.getTypedVersion(ctx)
if err != nil {
klog.ErrorS(err, "Get runtime version failed")
return nil, err
return nil, nil, err
}
// Only matching kubeRuntimeAPIVersion is supported now
@ -271,7 +271,7 @@ func NewKubeGenericRuntimeManager(
klog.ErrorS(err, "This runtime api version is not supported",
"apiVersion", typedVersion.Version,
"supportedAPIVersion", kubeRuntimeAPIVersion)
return nil, ErrVersionNotSupported
return nil, nil, ErrVersionNotSupported
}
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
@ -287,6 +287,7 @@ func NewKubeGenericRuntimeManager(
}
}
var imageGCHooks []images.PostImageGCHook
var imagePullManager images.ImagePullManager = &images.NoopImagePullManager{}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
imagePullCredentialsVerificationPolicy, err := images.NewImagePullCredentialVerificationPolicy(
@ -294,18 +295,20 @@ func NewKubeGenericRuntimeManager(
preloadedImagesCredentialVerificationWhitelist)
if err != nil {
return nil, err
return nil, nil, err
}
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(rootDirectory)
if err != nil {
return nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err)
return nil, nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err)
}
imagePullManager, err = images.NewImagePullManager(ctx, fsRecordAccessor, imagePullCredentialsVerificationPolicy, kubeRuntimeManager, ptr.Deref(maxParallelImagePulls, 0))
if err != nil {
return nil, fmt.Errorf("failed to create image pull manager: %w", err)
return nil, nil, fmt.Errorf("failed to create image pull manager: %w", err)
}
imageGCHooks = append(imageGCHooks, imagePullManager.PruneUnknownRecords)
}
nodeKeyring := credentialprovider.NewDefaultDockerKeyring()
@ -331,7 +334,7 @@ func NewKubeGenericRuntimeManager(
versionCacheTTL,
)
return kubeRuntimeManager, nil
return kubeRuntimeManager, imageGCHooks, nil
}
// Type returns the type of the container runtime.