From b8fc6042ca0870eef24a697708fb54d232504ffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanislav=20L=C3=A1zni=C4=8Dka?= Date: Wed, 23 Oct 2024 14:08:56 +0200 Subject: [PATCH] kubelet: GC for image pull managers --- pkg/kubelet/images/image_gc_manager.go | 45 ++++++-- pkg/kubelet/images/image_gc_manager_test.go | 4 +- pkg/kubelet/images/image_pull_manager.go | 29 ++++- pkg/kubelet/images/image_pull_manager_test.go | 108 ++++++++++++++++++ pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/kubelet_test.go | 4 +- .../kuberuntime/kuberuntime_manager.go | 17 +-- 7 files changed, 185 insertions(+), 26 deletions(-) diff --git a/pkg/kubelet/images/image_gc_manager.go b/pkg/kubelet/images/image_gc_manager.go index b13ec8f470b..60650376222 100644 --- a/pkg/kubelet/images/image_gc_manager.go +++ b/pkg/kubelet/images/image_gc_manager.go @@ -57,6 +57,11 @@ const ( ImageGarbageCollectedTotalReasonSpace = "space" ) +// PostImageGCHook allows external sources to react to GC collect events. +// `remainingImages` is a list of images that were left on the system after garbage +// collection finished. +type PostImageGCHook func(remainingImages []string, gcStart time.Time) + // StatsProvider is an interface for fetching stats used during image garbage // collection. type StatsProvider interface { @@ -128,6 +133,8 @@ type realImageGCManager struct { // imageCache is the cache of latest image list. imageCache imageCache + postGCHooks []PostImageGCHook + // tracer for recording spans tracer trace.Tracer } @@ -181,7 +188,7 @@ type imageRecord struct { } // NewImageGCManager instantiates a new ImageGCManager object. -func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, tracerProvider trace.TracerProvider) (ImageGCManager, error) { +func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, postGCHooks []PostImageGCHook, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, tracerProvider trace.TracerProvider) (ImageGCManager, error) { // Validate policy. if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 { return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent) @@ -200,6 +207,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r statsProvider: statsProvider, recorder: recorder, nodeRef: nodeRef, + postGCHooks: postGCHooks, tracer: tracer, } @@ -381,11 +389,13 @@ func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.T if usagePercent >= im.policy.HighThresholdPercent { amountToFree := capacity*int64(100-im.policy.LowThresholdPercent)/100 - available klog.InfoS("Disk usage on image filesystem is over the high threshold, trying to free bytes down to the low threshold", "usage", usagePercent, "highThreshold", im.policy.HighThresholdPercent, "amountToFree", amountToFree, "lowThreshold", im.policy.LowThresholdPercent) - freed, err := im.freeSpace(ctx, amountToFree, freeTime, images) + remainingImages, freed, err := im.freeSpace(ctx, amountToFree, freeTime, images) if err != nil { return err } + im.runPostGCHooks(remainingImages, freeTime) + if freed < amountToFree { err := fmt.Errorf("Failed to garbage collect required amount of images. Attempted to free %d bytes, but only found %d bytes eligible to free.", amountToFree, freed) im.recorder.Eventf(im.nodeRef, v1.EventTypeWarning, events.FreeDiskSpaceFailed, err.Error()) @@ -396,6 +406,12 @@ func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.T return nil } +func (im *realImageGCManager) runPostGCHooks(remainingImages []string, gcStartTime time.Time) { + for _, h := range im.postGCHooks { + h(remainingImages, gcStartTime) + } +} + func (im *realImageGCManager) freeOldImages(ctx context.Context, images []evictionInfo, freeTime, beganGC time.Time) ([]evictionInfo, error) { if im.policy.MaxAge == 0 { return images, nil @@ -430,29 +446,38 @@ func (im *realImageGCManager) freeOldImages(ctx context.Context, images []evicti func (im *realImageGCManager) DeleteUnusedImages(ctx context.Context) error { klog.InfoS("Attempting to delete unused images") freeTime := time.Now() + images, err := im.imagesInEvictionOrder(ctx, freeTime) if err != nil { return err } - _, err = im.freeSpace(ctx, math.MaxInt64, freeTime, images) - return err + + remainingImages, _, err := im.freeSpace(ctx, math.MaxInt64, freeTime, images) + if err != nil { + return err + } + + im.runPostGCHooks(remainingImages, freeTime) + return nil } // Tries to free bytesToFree worth of images on the disk. // -// Returns the number of bytes free and an error if any occurred. The number of -// bytes freed is always returned. +// Returns the images that are still available after the cleanup, the number of bytes freed +// and an error if any occurred. The number of bytes freed is always returned. // Note that error may be nil and the number of bytes free may be less // than bytesToFree. -func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, freeTime time.Time, images []evictionInfo) (int64, error) { +func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, freeTime time.Time, images []evictionInfo) ([]string, int64, error) { // Delete unused images until we've freed up enough space. var deletionErrors []error spaceFreed := int64(0) + var imagesLeft []string for _, image := range images { klog.V(5).InfoS("Evaluating image ID for possible garbage collection based on disk usage", "imageID", image.id, "runtimeHandler", image.imageRecord.runtimeHandlerUsedToPullImage) // Images that are currently in used were given a newer lastUsed. if image.lastUsed.Equal(freeTime) || image.lastUsed.After(freeTime) { klog.V(5).InfoS("Image ID was used too recently, not eligible for garbage collection", "imageID", image.id, "lastUsed", image.lastUsed, "freeTime", freeTime) + imagesLeft = append(imagesLeft, image.id) continue } @@ -460,11 +485,13 @@ func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, // In such a case, the image may have just been pulled down, and will be used by a container right away. if freeTime.Sub(image.firstDetected) < im.policy.MinAge { klog.V(5).InfoS("Image ID's age is less than the policy's minAge, not eligible for garbage collection", "imageID", image.id, "age", freeTime.Sub(image.firstDetected), "minAge", im.policy.MinAge) + imagesLeft = append(imagesLeft, image.id) continue } if err := im.freeImage(ctx, image, ImageGarbageCollectedTotalReasonSpace); err != nil { deletionErrors = append(deletionErrors, err) + imagesLeft = append(imagesLeft, image.id) continue } spaceFreed += image.size @@ -475,9 +502,9 @@ func (im *realImageGCManager) freeSpace(ctx context.Context, bytesToFree int64, } if len(deletionErrors) > 0 { - return spaceFreed, fmt.Errorf("wanted to free %d bytes, but freed %d bytes space with errors in image deletion: %v", bytesToFree, spaceFreed, errors.NewAggregate(deletionErrors)) + return nil, spaceFreed, fmt.Errorf("wanted to free %d bytes, but freed %d bytes space with errors in image deletion: %w", bytesToFree, spaceFreed, errors.NewAggregate(deletionErrors)) } - return spaceFreed, nil + return imagesLeft, spaceFreed, nil } func (im *realImageGCManager) freeImage(ctx context.Context, image evictionInfo, reason string) error { diff --git a/pkg/kubelet/images/image_gc_manager_test.go b/pkg/kubelet/images/image_gc_manager_test.go index ffe5297267f..332c12cb026 100644 --- a/pkg/kubelet/images/image_gc_manager_test.go +++ b/pkg/kubelet/images/image_gc_manager_test.go @@ -740,7 +740,7 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) { func getImagesAndFreeSpace(ctx context.Context, t *testing.T, assert *assert.Assertions, im *realImageGCManager, fakeRuntime *containertest.FakeRuntime, spaceToFree, expectedSpaceFreed int64, imagesLen int, freeTime time.Time) { images, err := im.imagesInEvictionOrder(ctx, freeTime) require.NoError(t, err) - spaceFreed, err := im.freeSpace(ctx, spaceToFree, freeTime, images) + _, spaceFreed, err := im.freeSpace(ctx, spaceToFree, freeTime, images) require.NoError(t, err) assert.EqualValues(expectedSpaceFreed, spaceFreed) assert.Len(fakeRuntime.ImageList, imagesLen) @@ -910,7 +910,7 @@ func TestValidateImageGCPolicy(t *testing.T) { } for _, tc := range testCases { - if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, noopoteltrace.NewTracerProvider()); err != nil { + if _, err := NewImageGCManager(nil, nil, nil, nil, nil, tc.imageGCPolicy, noopoteltrace.NewTracerProvider()); err != nil { if err.Error() != tc.expectErr { t.Errorf("[%s:]Expected err:%v, but got:%v", tc.name, tc.expectErr, err.Error()) } diff --git a/pkg/kubelet/images/image_pull_manager.go b/pkg/kubelet/images/image_pull_manager.go index e402e8b7bba..5407fb64854 100644 --- a/pkg/kubelet/images/image_pull_manager.go +++ b/pkg/kubelet/images/image_pull_manager.go @@ -274,8 +274,30 @@ func (f *PullManager) MustAttemptImagePull(image, imageRef string, podSecrets [] } func (f *PullManager) PruneUnknownRecords(imageList []string, until time.Time) { - // TODO: also cleanup the lock maps for intent/pull records? - panic("implement me") + f.pulledAccessors.GlobalLock() + defer f.pulledAccessors.GlobalUnlock() + + pulledRecords, err := f.recordsAccessor.ListImagePulledRecords() + if err != nil { + klog.ErrorS(err, "there were errors listing ImagePulledRecords, garbage collection will proceed with incomplete records list") + } + + imagesInUse := sets.New(imageList...) + for _, imageRecord := range pulledRecords { + if !imageRecord.LastUpdatedTime.Time.Before(until) { + // the image record was only updated after the GC started + continue + } + + if imagesInUse.Has(imageRecord.ImageRef) { + continue + } + + if err := f.recordsAccessor.DeleteImagePulledRecord(imageRecord.ImageRef); err != nil { + klog.ErrorS(err, "failed to remove an ImagePulledRecord", "imageRef", imageRecord.ImageRef) + } + } + } // initialize gathers all the images from pull intent records that exist @@ -288,8 +310,7 @@ func (f *PullManager) PruneUnknownRecords(imageList []string, until time.Time) { func (f *PullManager) initialize(ctx context.Context) { pullIntents, err := f.recordsAccessor.ListImagePullIntents() if err != nil { - klog.ErrorS(err, "there was an error listing ImagePullIntents") - return + klog.ErrorS(err, "there were errors listing ImagePullIntents, continuing with an incomplete records list") } if len(pullIntents) == 0 { diff --git a/pkg/kubelet/images/image_pull_manager_test.go b/pkg/kubelet/images/image_pull_manager_test.go index 2324ced90cf..320375630f7 100644 --- a/pkg/kubelet/images/image_pull_manager_test.go +++ b/pkg/kubelet/images/image_pull_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package images import ( + "io/fs" "os" "path/filepath" "reflect" @@ -873,6 +874,113 @@ func TestFileBasedImagePullManager_initialize(t *testing.T) { } } +func TestFileBasedImagePullManager_PruneUnknownRecords(t *testing.T) { + tests := []struct { + name string + imageList []string + gcStartTime time.Time + pulledFiles []string + wantFiles sets.Set[string] + }{ + { + name: "all images present", + imageList: []string{"testimage-anonpull", "testimageref", "testemptycredmapping"}, + gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC), + pulledFiles: []string{ + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + "sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991", + }, + wantFiles: sets.New( + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + "sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991", + ), + }, + { + name: "remove all records on empty list from the GC", + imageList: []string{}, + gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC), + pulledFiles: []string{ + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + "sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991", + }, + }, + { + name: "remove all records on list of untracked images from the GC", + imageList: []string{"untracked1", "different-untracked"}, + gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC), + pulledFiles: []string{ + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + "sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991", + }, + }, + { + name: "remove records without a match in the image list from the GC", + imageList: []string{"testimage-anonpull", "untracked1", "testimageref", "different-untracked"}, + gcStartTime: time.Date(2024, 12, 25, 00, 01, 00, 00, time.UTC), + pulledFiles: []string{ + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + "sha256-f8778b6393eaf39315e767a58cbeacf2c4b270d94b4d6926ee993d9e49444991", + }, + wantFiles: sets.New( + "sha256-a2eace2182b24cdbbb730798e47b10709b9ef5e0f0c1624a3bc06c8ca987727a", + "sha256-b3c0cc4278800b03a308ceb2611161430df571ca733122f0a40ac8b9792a9064", + ), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoder, decoder, err := createKubeletConfigSchemeEncoderDecoder() + require.NoError(t, err) + + testDir := t.TempDir() + pulledDir := filepath.Join(testDir, "pulled") + if err := os.MkdirAll(pulledDir, 0700); err != nil { + t.Fatalf("failed to create testing dir %q: %v", pulledDir, err) + } + + copyTestData(t, pulledDir, "pulled", tt.pulledFiles) + + fsRecordAccessor := &fsPullRecordsAccessor{ + pulledDir: pulledDir, + encoder: encoder, + decoder: decoder, + } + + f := &PullManager{ + recordsAccessor: fsRecordAccessor, + pulledAccessors: NewStripedLockSet(10), + } + f.PruneUnknownRecords(tt.imageList, tt.gcStartTime) + + filesLeft := sets.New[string]() + err = filepath.Walk(pulledDir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + + if path == pulledDir { + return nil + } + + filesLeft.Insert(info.Name()) + return nil + }) + if err != nil { + t.Fatalf("failed to walk the pull dir after prune: %v", err) + } + + if !tt.wantFiles.Equal(filesLeft) { + t.Errorf("expected equal sets, diff: %s", cmp.Diff(tt.wantFiles, filesLeft)) + } + }) + } +} + func copyTestData(t *testing.T, dstDir string, testdataDir string, src []string) { for _, f := range src { testBytes, err := os.ReadFile(filepath.Join("testdata", testdataDir, f)) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3902ddbd302..b03edd716db 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -759,7 +759,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } } - runtime, err := kuberuntime.NewKubeGenericRuntimeManager( + runtime, postImageGCHooks, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, klet.readinessManager, @@ -883,7 +883,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, max(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) // setup imageManager - imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, kubeDeps.TracerProvider) + imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, postImageGCHooks, kubeDeps.Recorder, nodeRef, imageGCPolicy, kubeDeps.TracerProvider) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6a5d75f8ece..b431135d2f8 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -322,7 +322,7 @@ func newTestKubeletWithImageList( HighThresholdPercent: 90, LowThresholdPercent: 80, } - imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, noopoteltrace.NewTracerProvider()) + imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, nil, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, noopoteltrace.NewTracerProvider()) assert.NoError(t, err) kubelet.imageManager = &fakeImageGCManager{ fakeImageService: fakeRuntime, @@ -3394,7 +3394,7 @@ func TestSyncPodSpans(t *testing.T) { imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp, &logger) assert.NoError(t, err) - kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + kubelet.containerRuntime, _, err = kuberuntime.NewKubeGenericRuntimeManager( kubelet.recorder, kubelet.livenessManager, kubelet.readinessManager, diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 90ff28fd6b1..c616cb56940 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -227,7 +227,7 @@ func NewKubeGenericRuntimeManager( tracerProvider trace.TracerProvider, tokenManager *token.Manager, getServiceAccount plugin.GetServiceAccountFunc, -) (KubeGenericRuntime, error) { +) (KubeGenericRuntime, []images.PostImageGCHook, error) { ctx := context.Background() runtimeService = newInstrumentedRuntimeService(runtimeService) imageService = newInstrumentedImageManagerService(imageService) @@ -262,7 +262,7 @@ func NewKubeGenericRuntimeManager( typedVersion, err := kubeRuntimeManager.getTypedVersion(ctx) if err != nil { klog.ErrorS(err, "Get runtime version failed") - return nil, err + return nil, nil, err } // Only matching kubeRuntimeAPIVersion is supported now @@ -271,7 +271,7 @@ func NewKubeGenericRuntimeManager( klog.ErrorS(err, "This runtime api version is not supported", "apiVersion", typedVersion.Version, "supportedAPIVersion", kubeRuntimeAPIVersion) - return nil, ErrVersionNotSupported + return nil, nil, ErrVersionNotSupported } kubeRuntimeManager.runtimeName = typedVersion.RuntimeName @@ -287,6 +287,7 @@ func NewKubeGenericRuntimeManager( } } + var imageGCHooks []images.PostImageGCHook var imagePullManager images.ImagePullManager = &images.NoopImagePullManager{} if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) { imagePullCredentialsVerificationPolicy, err := images.NewImagePullCredentialVerificationPolicy( @@ -294,18 +295,20 @@ func NewKubeGenericRuntimeManager( preloadedImagesCredentialVerificationWhitelist) if err != nil { - return nil, err + return nil, nil, err } fsRecordAccessor, err := images.NewFSPullRecordsAccessor(rootDirectory) if err != nil { - return nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err) + return nil, nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err) } imagePullManager, err = images.NewImagePullManager(ctx, fsRecordAccessor, imagePullCredentialsVerificationPolicy, kubeRuntimeManager, ptr.Deref(maxParallelImagePulls, 0)) if err != nil { - return nil, fmt.Errorf("failed to create image pull manager: %w", err) + return nil, nil, fmt.Errorf("failed to create image pull manager: %w", err) } + + imageGCHooks = append(imageGCHooks, imagePullManager.PruneUnknownRecords) } nodeKeyring := credentialprovider.NewDefaultDockerKeyring() @@ -331,7 +334,7 @@ func NewKubeGenericRuntimeManager( versionCacheTTL, ) - return kubeRuntimeManager, nil + return kubeRuntimeManager, imageGCHooks, nil } // Type returns the type of the container runtime.