kubelet: pullmanager: move to a separate package

This commit is contained in:
Stanislav Láznička 2024-11-06 21:08:46 +01:00
parent b8fc6042ca
commit 788b7abe40
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
23 changed files with 190 additions and 122 deletions

View File

@ -32,7 +32,7 @@ import (
tracingapi "k8s.io/component-base/tracing/api/v1"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/images"
imagepullmanager "k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
utiltaints "k8s.io/kubernetes/pkg/util/taints"
@ -301,7 +301,7 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
if len(kc.PreloadedImagesVerificationAllowlist) > 0 && kc.ImagePullCredentialsVerificationPolicy != string(kubeletconfig.NeverVerifyAllowlistedImages) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: can't set `preloadedImagesVerificationAllowlist` if `imagePullCredentialsVertificationPolicy` is not \"NeverVerifyAllowlistedImages\""))
} else if err := images.ValidateAllowlistImagesPatterns(kc.PreloadedImagesVerificationAllowlist); err != nil {
} else if err := imagepullmanager.ValidateAllowlistImagesPatterns(kc.PreloadedImagesVerificationAllowlist); err != nil {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: invalid image pattern in `preloadedImagesVerificationAllowlist`: %w", err))
}
} else {

View File

@ -39,6 +39,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/util/parsers"
)
@ -52,7 +53,7 @@ type ImagePodPullingTimeRecorder interface {
type imageManager struct {
recorder record.EventRecorder
imageService kubecontainer.ImageService
imagePullManager ImagePullManager
imagePullManager pullmanager.ImagePullManager
backOff *flowcontrol.Backoff
prevPullErrMsg sync.Map
@ -70,7 +71,7 @@ func NewImageManager(
recorder record.EventRecorder,
nodeKeyring credentialprovider.DockerKeyring,
imageService kubecontainer.ImageService,
imagePullManager ImagePullManager,
imagePullManager pullmanager.ImagePullManager,
imageBackOff *flowcontrol.Backoff,
serialized bool,
maxParallelImagePulls *int32,

View File

@ -42,6 +42,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
"k8s.io/kubernetes/test/utils/ktesting"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
@ -513,7 +514,7 @@ func (m *mockPodPullingTimeRecorder) reset() {
}
type mockImagePullManager struct {
NoopImagePullManager
pullmanager.NoopImagePullManager
imageAllowlist map[string]sets.Set[kubeletconfiginternal.ImagePullSecret]
allowAll bool

View File

@ -0,0 +1,19 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// pullmanager package keeps the implementation of the image pull manager and
// image credential verification policies
package pullmanager

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"bytes"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"context"
@ -367,19 +367,6 @@ func (f *PullManager) getIntentCounterForImage(image string) int32 {
return intentNum
}
var _ ImagePullManager = &NoopImagePullManager{}
type NoopImagePullManager struct{}
func (m *NoopImagePullManager) RecordPullIntent(_ string) error { return nil }
func (m *NoopImagePullManager) RecordImagePulled(_, _ string, _ *kubeletconfiginternal.ImagePullCredentials) {
}
func (m *NoopImagePullManager) RecordImagePullFailed(image string) {}
func (m *NoopImagePullManager) MustAttemptImagePull(_, _ string, _ []kubeletconfiginternal.ImagePullSecret) bool {
return false
}
func (m *NoopImagePullManager) PruneUnknownRecords(_ []string, _ time.Time) {}
// searchForExistingTagDigest loops through the `image` RepoDigests and RepoTags
// and tries to find all image digests/tags in `inFlightPulls`, which is a map of
// containerImage -> pulling intent path.

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"io/fs"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"reflect"

View File

@ -0,0 +1,110 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pullmanager
import (
"time"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// ImagePullManager keeps the state of images that were pulled and which are
// currently still being pulled.
// It should keep an internal state of images currently being pulled by the kubelet
// in order to determine whether to destroy a "pulling" record should an image
// pull fail.
type ImagePullManager interface {
// RecordPullIntent records an intent to pull an image and should be called
// before a pull of the image occurs.
//
// RecordPullIntent() should be called before every image pull. Each call of
// RecordPullIntent() must match exactly one call of RecordImagePulled()/RecordImagePullFailed().
//
// `image` is the content of the pod's container `image` field.
RecordPullIntent(image string) error
// RecordImagePulled writes a record of an image being successfully pulled
// with ImagePullCredentials.
//
// `credentials` must not be nil and must contain either exactly one Kubernetes
// Secret coordinates in the `.KubernetesSecrets` slice or set `.NodePodsAccessible`
// to `true`.
//
// `image` is the content of the pod's container `image` field.
RecordImagePulled(image, imageRef string, credentials *kubeletconfiginternal.ImagePullCredentials)
// RecordImagePullFailed should be called if an image failed to pull.
//
// Internally, it lowers its reference counter for the given image. If the
// counter reaches zero, the pull intent record for the image is removed.
//
// `image` is the content of the pod's container `image` field.
RecordImagePullFailed(image string)
// MustAttemptImagePull evaluates the policy for the image specified in
// `image` and if the policy demands verification, it checks the internal
// cache to see if there's a record of pulling the image with the presented
// set of credentials or if the image can be accessed by any of the node's pods.
//
// Returns true if the policy demands verification and no record of the pull
// was found in the cache.
//
// `image` is the content of the pod's container `image` field.
MustAttemptImagePull(image, imageRef string, credentials []kubeletconfiginternal.ImagePullSecret) bool
// PruneUnknownRecords deletes all of the cache ImagePulledRecords for each of the images
// whose imageRef does not appear in the `imageList` iff such an record was last updated
// _before_ the `until` timestamp.
//
// This method is only expected to be called by the kubelet's image garbage collector.
// `until` is a timestamp created _before_ the `imageList` was requested from the CRI.
PruneUnknownRecords(imageList []string, until time.Time)
}
// PullRecordsAccessor allows unified access to ImagePullIntents/ImagePulledRecords
// irregardless of the backing database implementation
type PullRecordsAccessor interface {
// ListImagePullIntents lists all the ImagePullIntents in the database.
// ImagePullIntents that cannot be decoded will not appear in the list.
// Returns nil and an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error)
// ImagePullIntentExists returns whether a valid ImagePullIntent is present
// for the given image.
ImagePullIntentExists(image string) (bool, error)
// WriteImagePullIntent writes a an intent record for the image into the database
WriteImagePullIntent(image string) error
// DeleteImagePullIntent removes an `image` intent record from the database
DeleteImagePullIntent(image string) error
// ListImagePulledRecords lists the database ImagePulledRecords.
// Records that cannot be decoded will be ignored.
// Returns an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error)
// GetImagePulledRecord fetches an ImagePulledRecord for the given `imageRef`.
// If a file for the `imageRef` is present but the contents cannot be decoded,
// it returns a exists=true with err equal to the decoding error.
GetImagePulledRecord(imageRef string) (record *kubeletconfiginternal.ImagePulledRecord, exists bool, err error)
// WriteImagePulledRecord writes an ImagePulledRecord into the database.
WriteImagePulledRecord(record *kubeletconfiginternal.ImagePulledRecord) error
// DeleteImagePulledRecord removes an ImagePulledRecord for `imageRef` from the
// database.
DeleteImagePulledRecord(imageRef string) error
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package images
package pullmanager
import (
"hash/fnv"

View File

@ -0,0 +1,36 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pullmanager
import (
"time"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
var _ ImagePullManager = &NoopImagePullManager{}
type NoopImagePullManager struct{}
func (m *NoopImagePullManager) RecordPullIntent(_ string) error { return nil }
func (m *NoopImagePullManager) RecordImagePulled(_, _ string, _ *kubeletconfiginternal.ImagePullCredentials) {
}
func (m *NoopImagePullManager) RecordImagePullFailed(image string) {}
func (m *NoopImagePullManager) MustAttemptImagePull(_, _ string, _ []kubeletconfiginternal.ImagePullSecret) bool {
return false
}
func (m *NoopImagePullManager) PruneUnknownRecords(_ []string, _ time.Time) {}

View File

@ -19,11 +19,9 @@ package images
import (
"context"
"errors"
"time"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
var (
@ -54,90 +52,3 @@ type ImageManager interface {
// TODO(ronl): consolidating image managing and deleting operation in this interface
}
// ImagePullManager keeps the state of images that were pulled and which are
// currently still being pulled.
// It should keep an internal state of images currently being pulled by the kubelet
// in order to determine whether to destroy a "pulling" record should an image
// pull fail.
type ImagePullManager interface {
// RecordPullIntent records an intent to pull an image and should be called
// before a pull of the image occurs.
//
// RecordPullIntent() should be called before every image pull. Each call of
// RecordPullIntent() must match exactly one call of RecordImagePulled()/RecordImagePullFailed().
//
// `image` is the content of the pod's container `image` field.
RecordPullIntent(image string) error
// RecordImagePulled writes a record of an image being successfully pulled
// with ImagePullCredentials.
//
// `credentials` must not be nil and must contain either exactly one Kubernetes
// Secret coordinates in the `.KubernetesSecrets` slice or set `.NodePodsAccessible`
// to `true`.
//
// `image` is the content of the pod's container `image` field.
RecordImagePulled(image, imageRef string, credentials *kubeletconfiginternal.ImagePullCredentials)
// RecordImagePullFailed should be called if an image failed to pull.
//
// Internally, it lowers its reference counter for the given image. If the
// counter reaches zero, the pull intent record for the image is removed.
//
// `image` is the content of the pod's container `image` field.
RecordImagePullFailed(image string)
// MustAttemptImagePull evaluates the policy for the image specified in
// `image` and if the policy demands verification, it checks the internal
// cache to see if there's a record of pulling the image with the presented
// set of credentials or if the image can be accessed by any of the node's pods.
//
// Returns true if the policy demands verification and no record of the pull
// was found in the cache.
//
// `image` is the content of the pod's container `image` field.
MustAttemptImagePull(image, imageRef string, credentials []kubeletconfiginternal.ImagePullSecret) bool
// PruneUnknownRecords deletes all of the cache ImagePulledRecords for each of the images
// whose imageRef does not appear in the `imageList` iff such an record was last updated
// _before_ the `until` timestamp.
//
// This method is only expected to be called by the kubelet's image garbage collector.
// `until` is a timestamp created _before_ the `imageList` was requested from the CRI.
PruneUnknownRecords(imageList []string, until time.Time)
}
// PullRecordsAccessor allows unified access to ImagePullIntents/ImagePulledRecords
// irregardless of the backing database implementation
type PullRecordsAccessor interface {
// ListImagePullIntents lists all the ImagePullIntents in the database.
// ImagePullIntents that cannot be decoded will not appear in the list.
// Returns nil and an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error)
// ImagePullIntentExists returns whether a valid ImagePullIntent is present
// for the given image.
ImagePullIntentExists(image string) (bool, error)
// WriteImagePullIntent writes a an intent record for the image into the database
WriteImagePullIntent(image string) error
// DeleteImagePullIntent removes an `image` intent record from the database
DeleteImagePullIntent(image string) error
// ListImagePulledRecords lists the database ImagePulledRecords.
// Records that cannot be decoded will be ignored.
// Returns an error if there was a problem reading from the database.
//
// This method may return partial success in case there were errors listing
// the results. A list of records that were successfully read and an aggregated
// error is returned in that case.
ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error)
// GetImagePulledRecord fetches an ImagePulledRecord for the given `imageRef`.
// If a file for the `imageRef` is present but the contents cannot be decoded,
// it returns a exists=true with err equal to the decoding error.
GetImagePulledRecord(imageRef string) (record *kubeletconfiginternal.ImagePulledRecord, exists bool, err error)
// WriteImagePulledRecord writes an ImagePulledRecord into the database.
WriteImagePulledRecord(record *kubeletconfiginternal.ImagePulledRecord) error
// DeleteImagePulledRecord removes an ImagePulledRecord for `imageRef` from the
// database.
DeleteImagePulledRecord(imageRef string) error
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images"
imagepullmanager "k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
@ -135,7 +136,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
kubecontainer.FilterEventRecorder(recorder),
&credentialprovider.BasicDockerKeyring{},
kubeRuntimeManager,
&images.NoopImagePullManager{},
&imagepullmanager.NoopImagePullManager{},
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
ptr.To[int32](0), // No limit on max parallel image pulls,

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images"
imagepullmanager "k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
"k8s.io/utils/ptr"
)
@ -311,12 +312,12 @@ func TestPullWithSecrets(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestRuntimeManager()
require.NoError(t, err)
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(t.TempDir())
fsRecordAccessor, err := imagepullmanager.NewFSPullRecordsAccessor(t.TempDir())
if err != nil {
t.Fatal("failed to setup an file pull records accessor")
}
imagePullManager, err := images.NewImagePullManager(context.Background(), fsRecordAccessor, images.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
imagePullManager, err := imagepullmanager.NewImagePullManager(context.Background(), fsRecordAccessor, imagepullmanager.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
if err != nil {
t.Fatal("failed to setup an image pull manager")
}
@ -385,12 +386,12 @@ func TestPullWithSecretsWithError(t *testing.T) {
fakeImageService.InjectError("PullImage", fmt.Errorf("test-error"))
}
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(t.TempDir())
fsRecordAccessor, err := imagepullmanager.NewFSPullRecordsAccessor(t.TempDir())
if err != nil {
t.Fatal("failed to setup an file pull records accessor")
}
imagePullManager, err := images.NewImagePullManager(context.Background(), fsRecordAccessor, images.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
imagePullManager, err := imagepullmanager.NewImagePullManager(context.Background(), fsRecordAccessor, imagepullmanager.AlwaysVerifyImagePullPolicy(), fakeManager, 10)
if err != nil {
t.Fatal("failed to setup an image pull manager")
}

View File

@ -56,6 +56,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
imagepullmanager "k8s.io/kubernetes/pkg/kubelet/images/pullmanager"
runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
@ -288,9 +289,9 @@ func NewKubeGenericRuntimeManager(
}
var imageGCHooks []images.PostImageGCHook
var imagePullManager images.ImagePullManager = &images.NoopImagePullManager{}
var imagePullManager imagepullmanager.ImagePullManager = &imagepullmanager.NoopImagePullManager{}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
imagePullCredentialsVerificationPolicy, err := images.NewImagePullCredentialVerificationPolicy(
imagePullCredentialsVerificationPolicy, err := imagepullmanager.NewImagePullCredentialVerificationPolicy(
kubeletconfiginternal.ImagePullCredentialsVerificationPolicy(imagePullsCredentialVerificationPolicy),
preloadedImagesCredentialVerificationWhitelist)
@ -298,12 +299,12 @@ func NewKubeGenericRuntimeManager(
return nil, nil, err
}
fsRecordAccessor, err := images.NewFSPullRecordsAccessor(rootDirectory)
fsRecordAccessor, err := imagepullmanager.NewFSPullRecordsAccessor(rootDirectory)
if err != nil {
return nil, nil, fmt.Errorf("failed to setup the FSPullRecordsAccessor: %w", err)
}
imagePullManager, err = images.NewImagePullManager(ctx, fsRecordAccessor, imagePullCredentialsVerificationPolicy, kubeRuntimeManager, ptr.Deref(maxParallelImagePulls, 0))
imagePullManager, err = imagepullmanager.NewImagePullManager(ctx, fsRecordAccessor, imagePullCredentialsVerificationPolicy, kubeRuntimeManager, ptr.Deref(maxParallelImagePulls, 0))
if err != nil {
return nil, nil, fmt.Errorf("failed to create image pull manager: %w", err)
}