Merge pull request #115220 from ruiwen-zhao/limit

Add MaxParallelImagePulls support
This commit is contained in:
Kubernetes Prow Robot 2023-03-01 23:32:55 -08:00 committed by GitHub
commit af9f7a4d90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 407 additions and 16 deletions

View File

@ -57699,6 +57699,13 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen
Format: "",
},
},
"maxParallelImagePulls": {
SchemaProps: spec.SchemaProps{
Description: "MaxParallelImagePulls sets the maximum number of image pulls in parallel. This field cannot be set if SerializeImagePulls is true. Setting it to nil means no limit. Default: nil",
Type: []string{"integer"},
Format: "int32",
},
},
"evictionHard": {
SchemaProps: spec.SchemaProps{
Description: "evictionHard is a map of signal names to quantities that defines hard eviction thresholds. For example: `{\"memory.available\": \"300Mi\"}`. To explicitly disable, pass a 0% or 100% threshold on an arbitrary resource. Default:\n memory.available: \"100Mi\"\n nodefs.available: \"10%\"\n nodefs.inodesFree: \"5%\"\n imagefs.available: \"15%\"",

View File

@ -266,6 +266,7 @@ var (
"RunOnce",
"SeccompDefault",
"SerializeImagePulls",
"MaxParallelImagePulls",
"ShowHiddenMetricsForVersion",
"ShutdownGracePeriodByPodPriority[*].Priority",
"ShutdownGracePeriodByPodPriority[*].ShutdownGracePeriodSeconds",

View File

@ -292,6 +292,8 @@ type KubeletConfiguration struct {
KubeAPIBurst int32
// serializeImagePulls when enabled, tells the Kubelet to pull images one at a time.
SerializeImagePulls bool
// MaxParallelImagePulls sets the maximum number of image pulls in parallel.
MaxParallelImagePulls *int32
// Map of signal names to quantities that defines hard eviction thresholds. For example: {"memory.available": "300Mi"}.
// Some default signals are Linux only: nodefs.inodesFree
EvictionHard map[string]string

View File

@ -206,7 +206,14 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
obj.KubeAPIBurst = 10
}
if obj.SerializeImagePulls == nil {
// SerializeImagePulls is default to true when MaxParallelImagePulls
// is not set, and false when MaxParallelImagePulls is set.
// This is to save users from having to set both configs.
if obj.MaxParallelImagePulls == nil || *obj.MaxParallelImagePulls < 2 {
obj.SerializeImagePulls = utilpointer.Bool(true)
} else {
obj.SerializeImagePulls = utilpointer.Bool(false)
}
}
if obj.EvictionPressureTransitionPeriod == zeroDuration {
obj.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 5 * time.Minute}

View File

@ -100,6 +100,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(5),
KubeAPIBurst: 10,
SerializeImagePulls: utilpointer.Bool(true),
MaxParallelImagePulls: nil,
EvictionHard: nil,
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EnableControllerAttachDetach: utilpointer.Bool(true),
@ -206,6 +207,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(0),
KubeAPIBurst: 0,
SerializeImagePulls: utilpointer.Bool(false),
MaxParallelImagePulls: nil,
EvictionHard: map[string]string{},
EvictionSoft: map[string]string{},
EvictionSoftGracePeriod: map[string]string{},
@ -314,6 +316,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(0),
KubeAPIBurst: 10,
SerializeImagePulls: utilpointer.Bool(false),
MaxParallelImagePulls: nil,
EvictionHard: map[string]string{},
EvictionSoft: map[string]string{},
EvictionSoftGracePeriod: map[string]string{},
@ -429,6 +432,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(1),
KubeAPIBurst: 1,
SerializeImagePulls: utilpointer.Bool(true),
MaxParallelImagePulls: utilpointer.Int32(5),
EvictionHard: map[string]string{
"memory.available": "1Mi",
"nodefs.available": "1%",
@ -574,6 +578,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(1),
KubeAPIBurst: 1,
SerializeImagePulls: utilpointer.Bool(true),
MaxParallelImagePulls: utilpointer.Int32Ptr(5),
EvictionHard: map[string]string{
"memory.available": "1Mi",
"nodefs.available": "1%",
@ -704,6 +709,185 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
KubeAPIQPS: utilpointer.Int32(5),
KubeAPIBurst: 10,
SerializeImagePulls: utilpointer.Bool(true),
MaxParallelImagePulls: nil,
EvictionHard: nil,
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EnableControllerAttachDetach: utilpointer.Bool(true),
MakeIPTablesUtilChains: utilpointer.Bool(true),
IPTablesMasqueradeBit: utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit),
IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit),
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32Ptr(5),
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,
Logging: logsapi.LoggingConfiguration{
Format: "text",
FlushFrequency: 5 * time.Second,
},
EnableSystemLogHandler: utilpointer.Bool(true),
EnableProfilingHandler: utilpointer.Bool(true),
EnableDebugFlagsHandler: utilpointer.Bool(true),
SeccompDefault: utilpointer.Bool(false),
MemoryThrottlingFactor: utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor),
RegisterNode: utilpointer.Bool(true),
LocalStorageCapacityIsolation: utilpointer.Bool(true),
},
},
{
"SerializeImagePull defaults to false when MaxParallelImagePulls is larger than 1",
&v1beta1.KubeletConfiguration{
MaxParallelImagePulls: utilpointer.Int32(5),
},
&v1beta1.KubeletConfiguration{
EnableServer: utilpointer.Bool(true),
SyncFrequency: metav1.Duration{Duration: 1 * time.Minute},
FileCheckFrequency: metav1.Duration{Duration: 20 * time.Second},
HTTPCheckFrequency: metav1.Duration{Duration: 20 * time.Second},
Address: "0.0.0.0",
Port: ports.KubeletPort,
Authentication: v1beta1.KubeletAuthentication{
Anonymous: v1beta1.KubeletAnonymousAuthentication{Enabled: utilpointer.Bool(false)},
Webhook: v1beta1.KubeletWebhookAuthentication{
Enabled: utilpointer.Bool(true),
CacheTTL: metav1.Duration{Duration: 2 * time.Minute},
},
},
Authorization: v1beta1.KubeletAuthorization{
Mode: v1beta1.KubeletAuthorizationModeWebhook,
Webhook: v1beta1.KubeletWebhookAuthorization{
CacheAuthorizedTTL: metav1.Duration{Duration: 5 * time.Minute},
CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second},
},
},
RegistryPullQPS: utilpointer.Int32Ptr(5),
RegistryBurst: 10,
EventRecordQPS: utilpointer.Int32Ptr(5),
EventBurst: 10,
EnableDebuggingHandlers: utilpointer.Bool(true),
HealthzPort: utilpointer.Int32Ptr(10248),
HealthzBindAddress: "127.0.0.1",
OOMScoreAdj: utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj)),
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second},
NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute},
NodeLeaseDurationSeconds: 40,
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute},
ImageGCHighThresholdPercent: utilpointer.Int32Ptr(85),
ImageGCLowThresholdPercent: utilpointer.Int32Ptr(80),
VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute},
CgroupsPerQOS: utilpointer.Bool(true),
CgroupDriver: "cgroupfs",
CPUManagerPolicy: "none",
CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second},
MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy,
TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy,
TopologyManagerScope: v1beta1.ContainerTopologyManagerScope,
RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute},
HairpinMode: v1beta1.PromiscuousBridge,
MaxPods: 110,
PodPidsLimit: utilpointer.Int64(-1),
ResolverConfig: utilpointer.String(kubetypes.ResolvConfDefault),
CPUCFSQuota: utilpointer.Bool(true),
CPUCFSQuotaPeriod: &metav1.Duration{Duration: 100 * time.Millisecond},
NodeStatusMaxImages: utilpointer.Int32Ptr(50),
MaxOpenFiles: 1000000,
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: utilpointer.Int32Ptr(5),
KubeAPIBurst: 10,
SerializeImagePulls: utilpointer.Bool(false),
MaxParallelImagePulls: utilpointer.Int32(5),
EvictionHard: nil,
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EnableControllerAttachDetach: utilpointer.Bool(true),
MakeIPTablesUtilChains: utilpointer.Bool(true),
IPTablesMasqueradeBit: utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit),
IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit),
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32Ptr(5),
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,
Logging: logsapi.LoggingConfiguration{
Format: "text",
FlushFrequency: 5 * time.Second,
},
EnableSystemLogHandler: utilpointer.Bool(true),
EnableProfilingHandler: utilpointer.Bool(true),
EnableDebugFlagsHandler: utilpointer.Bool(true),
SeccompDefault: utilpointer.Bool(false),
MemoryThrottlingFactor: utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor),
RegisterNode: utilpointer.Bool(true),
LocalStorageCapacityIsolation: utilpointer.Bool(true),
},
},
{
"SerializeImagePull defaults to true when MaxParallelImagePulls is set to 1",
&v1beta1.KubeletConfiguration{
MaxParallelImagePulls: utilpointer.Int32(1),
},
&v1beta1.KubeletConfiguration{
EnableServer: utilpointer.Bool(true),
SyncFrequency: metav1.Duration{Duration: 1 * time.Minute},
FileCheckFrequency: metav1.Duration{Duration: 20 * time.Second},
HTTPCheckFrequency: metav1.Duration{Duration: 20 * time.Second},
Address: "0.0.0.0",
Port: ports.KubeletPort,
Authentication: v1beta1.KubeletAuthentication{
Anonymous: v1beta1.KubeletAnonymousAuthentication{Enabled: utilpointer.Bool(false)},
Webhook: v1beta1.KubeletWebhookAuthentication{
Enabled: utilpointer.Bool(true),
CacheTTL: metav1.Duration{Duration: 2 * time.Minute},
},
},
Authorization: v1beta1.KubeletAuthorization{
Mode: v1beta1.KubeletAuthorizationModeWebhook,
Webhook: v1beta1.KubeletWebhookAuthorization{
CacheAuthorizedTTL: metav1.Duration{Duration: 5 * time.Minute},
CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second},
},
},
RegistryPullQPS: utilpointer.Int32Ptr(5),
RegistryBurst: 10,
EventRecordQPS: utilpointer.Int32Ptr(5),
EventBurst: 10,
EnableDebuggingHandlers: utilpointer.Bool(true),
HealthzPort: utilpointer.Int32Ptr(10248),
HealthzBindAddress: "127.0.0.1",
OOMScoreAdj: utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj)),
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second},
NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute},
NodeLeaseDurationSeconds: 40,
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute},
ImageGCHighThresholdPercent: utilpointer.Int32Ptr(85),
ImageGCLowThresholdPercent: utilpointer.Int32Ptr(80),
VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute},
CgroupsPerQOS: utilpointer.Bool(true),
CgroupDriver: "cgroupfs",
CPUManagerPolicy: "none",
CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second},
MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy,
TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy,
TopologyManagerScope: v1beta1.ContainerTopologyManagerScope,
RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute},
HairpinMode: v1beta1.PromiscuousBridge,
MaxPods: 110,
PodPidsLimit: utilpointer.Int64(-1),
ResolverConfig: utilpointer.String(kubetypes.ResolvConfDefault),
CPUCFSQuota: utilpointer.Bool(true),
CPUCFSQuotaPeriod: &metav1.Duration{Duration: 100 * time.Millisecond},
NodeStatusMaxImages: utilpointer.Int32Ptr(50),
MaxOpenFiles: 1000000,
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: utilpointer.Int32Ptr(5),
KubeAPIBurst: 10,
SerializeImagePulls: utilpointer.Bool(true),
MaxParallelImagePulls: utilpointer.Int32(1),
EvictionHard: nil,
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EnableControllerAttachDetach: utilpointer.Bool(true),

View File

@ -443,6 +443,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
if err := v1.Convert_Pointer_bool_To_bool(&in.SerializeImagePulls, &out.SerializeImagePulls, s); err != nil {
return err
}
out.MaxParallelImagePulls = (*int32)(unsafe.Pointer(in.MaxParallelImagePulls))
out.EvictionHard = *(*map[string]string)(unsafe.Pointer(&in.EvictionHard))
out.EvictionSoft = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoft))
out.EvictionSoftGracePeriod = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoftGracePeriod))
@ -626,6 +627,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
if err := v1.Convert_bool_To_Pointer_bool(&in.SerializeImagePulls, &out.SerializeImagePulls, s); err != nil {
return err
}
out.MaxParallelImagePulls = (*int32)(unsafe.Pointer(in.MaxParallelImagePulls))
out.EvictionHard = *(*map[string]string)(unsafe.Pointer(&in.EvictionHard))
out.EvictionSoft = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoft))
out.EvictionSoftGracePeriod = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoftGracePeriod))

View File

@ -122,6 +122,12 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
if kc.RegistryPullQPS < 0 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: registryPullQPS (--registry-qps) %v must not be a negative number", kc.RegistryPullQPS))
}
if kc.MaxParallelImagePulls != nil && *kc.MaxParallelImagePulls < 1 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: maxParallelImagePulls %v must be a positive number", *kc.MaxParallelImagePulls))
}
if kc.SerializeImagePulls && kc.MaxParallelImagePulls != nil && *kc.MaxParallelImagePulls > 1 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: maxParallelImagePulls cannot be larger than 1 unless SerializeImagePulls (--serialize-image-pulls) is set to false"))
}
if kc.ServerTLSBootstrap && !localFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: serverTLSBootstrap %v requires feature gate RotateKubeletServerCertificate", kc.ServerTLSBootstrap))
}

View File

@ -57,6 +57,7 @@ var (
ReadOnlyPort: 0,
RegistryBurst: 10,
RegistryPullQPS: 5,
MaxParallelImagePulls: nil,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
CPUCFSQuotaPeriod: metav1.Duration{Duration: 25 * time.Millisecond},
@ -298,6 +299,31 @@ func TestValidateKubeletConfiguration(t *testing.T) {
},
errMsg: "invalid configuration: registryPullQPS (--registry-qps) -1 must not be a negative number",
},
{
name: "invalid MaxParallelImagePulls",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.MaxParallelImagePulls = utilpointer.Int32(0)
return conf
},
errMsg: "invalid configuration: maxParallelImagePulls 0 must be a positive number",
},
{
name: "invalid MaxParallelImagePulls and SerializeImagePulls combination",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.MaxParallelImagePulls = utilpointer.Int32(3)
conf.SerializeImagePulls = true
return conf
},
errMsg: "invalid configuration: maxParallelImagePulls cannot be larger than 1 unless SerializeImagePulls (--serialize-image-pulls) is set to false",
},
{
name: "valid MaxParallelImagePulls and SerializeImagePulls combination",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.MaxParallelImagePulls = utilpointer.Int32(1)
conf.SerializeImagePulls = true
return conf
},
},
{
name: "specify ServerTLSBootstrap without enabling RotateKubeletServerCertificate",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {

View File

@ -227,6 +227,11 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
out.RuntimeRequestTimeout = in.RuntimeRequestTimeout
out.CPUCFSQuotaPeriod = in.CPUCFSQuotaPeriod
if in.MaxParallelImagePulls != nil {
in, out := &in.MaxParallelImagePulls, &out.MaxParallelImagePulls
*out = new(int32)
**out = **in
}
if in.EvictionHard != nil {
in, out := &in.EvictionHard, &out.EvictionHard
*out = make(map[string]string, len(*in))

View File

@ -58,6 +58,11 @@ type FakeRuntime struct {
Err error
InspectErr error
StatusErr error
// If BlockImagePulls is true, then all PullImage() calls will be blocked until
// UnblockImagePulls() is called. This is used to simulate image pull latency
// from container runtime.
BlockImagePulls bool
imagePullTokenBucket chan bool
T *testing.T
}
@ -129,6 +134,17 @@ func (f *FakeRuntime) ClearCalls() {
f.Err = nil
f.InspectErr = nil
f.StatusErr = nil
f.BlockImagePulls = false
if f.imagePullTokenBucket != nil {
for {
select {
case f.imagePullTokenBucket <- true:
default:
f.imagePullTokenBucket = nil
return
}
}
}
}
// UpdatePodCIDR fulfills the cri interface.
@ -151,6 +167,23 @@ func (f *FakeRuntime) AssertCalls(calls []string) bool {
return f.assertList(calls, f.CalledFunctions)
}
// AssertCallCounts checks if a certain call is called for a certain of numbers
func (f *FakeRuntime) AssertCallCounts(funcName string, expectedCount int) bool {
f.Lock()
defer f.Unlock()
actualCount := 0
for _, c := range f.CalledFunctions {
if funcName == c {
actualCount += 1
}
}
if expectedCount != actualCount {
f.T.Errorf("AssertCallCounts: expected %s to be called %d times, but was actually called %d times.", funcName, expectedCount, actualCount)
return false
}
return true
}
func (f *FakeRuntime) AssertStartedPods(pods []string) bool {
f.Lock()
defer f.Unlock()
@ -302,10 +335,8 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container
return f.Err
}
func (f *FakeRuntime) PullImage(_ context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "PullImage")
if f.Err == nil {
i := kubecontainer.Image{
@ -314,9 +345,37 @@ func (f *FakeRuntime) PullImage(_ context.Context, image kubecontainer.ImageSpec
}
f.ImageList = append(f.ImageList, i)
}
if !f.BlockImagePulls {
f.Unlock()
return image.Image, f.Err
}
retErr := f.Err
if f.imagePullTokenBucket == nil {
f.imagePullTokenBucket = make(chan bool, 1)
}
// Unlock before waiting for UnblockImagePulls calls, to avoid deadlock.
f.Unlock()
select {
case <-ctx.Done():
case <-f.imagePullTokenBucket:
}
return image.Image, retErr
}
// UnblockImagePulls unblocks a certain number of image pulls, if BlockImagePulls is true.
func (f *FakeRuntime) UnblockImagePulls(count int) {
if f.imagePullTokenBucket != nil {
for i := 0; i < count; i++ {
select {
case f.imagePullTokenBucket <- true:
default:
}
}
}
}
func (f *FakeRuntime) GetImageRef(_ context.Context, image kubecontainer.ImageSpec) (string, error) {
f.Lock()
defer f.Unlock()

View File

@ -52,14 +52,14 @@ type imageManager struct {
var _ ImageManager = &imageManager{}
// NewImageManager instantiates a new ImageManager object.
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller
if serialized {
puller = newSerialImagePuller(imageService)
} else {
puller = newParallelImagePuller(imageService)
puller = newParallelImagePuller(imageService, maxParallelImagePulls)
}
return &imageManager{
recorder: recorder,

View File

@ -19,6 +19,7 @@ package images
import (
"context"
"errors"
"sync"
"testing"
"time"
@ -31,6 +32,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container"
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
testingclock "k8s.io/utils/clock/testing"
utilpointer "k8s.io/utils/pointer"
)
type pullerExpects struct {
@ -166,7 +168,7 @@ func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID)
func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) {
func pullerTestEnv(c pullerTestCase, serialized bool, maxParallelImagePulls *int32) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) {
container = &v1.Container{
Name: "container_name",
Image: c.containerImage,
@ -184,7 +186,7 @@ func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fake
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst, &mockPodPullingTimeRecorder{})
puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, &mockPodPullingTimeRecorder{})
return
}
@ -201,7 +203,7 @@ func TestParallelPuller(t *testing.T) {
useSerializedEnv := false
for _, c := range cases {
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv)
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil)
t.Run(c.testName, func(t *testing.T) {
ctx := context.Background()
@ -229,7 +231,7 @@ func TestSerializedPuller(t *testing.T) {
useSerializedEnv := true
for _, c := range cases {
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv)
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil)
t.Run(c.testName, func(t *testing.T) {
ctx := context.Background()
@ -287,7 +289,7 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) {
}}
useSerializedEnv := true
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv)
puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil)
fakeRuntime.CalledFunctions = nil
fakeRuntime.ImageList = []Image{}
fakeClock.Step(time.Second)
@ -312,3 +314,69 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) {
assert.Equal(t, expectedAnnotations, image.Spec.Annotations, "image spec annotations")
})
}
func TestMaxParallelImagePullsLimit(t *testing.T) {
ctx := context.Background()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test_pod",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
}}
testCase := &pullerTestCase{
containerImage: "present_image",
testName: "image present, pull ",
policy: v1.PullAlways,
inspectErr: nil,
pullerErr: nil,
qps: 0.0,
burst: 0,
}
useSerializedEnv := false
maxParallelImagePulls := 5
var wg sync.WaitGroup
puller, fakeClock, fakeRuntime, container := pullerTestEnv(*testCase, useSerializedEnv, utilpointer.Int32Ptr(int32(maxParallelImagePulls)))
fakeRuntime.BlockImagePulls = true
fakeRuntime.CalledFunctions = nil
fakeRuntime.T = t
fakeClock.Step(time.Second)
// First 5 EnsureImageExists should result in runtime calls
for i := 0; i < maxParallelImagePulls; i++ {
wg.Add(1)
go func() {
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil)
assert.Nil(t, err)
wg.Done()
}()
}
time.Sleep(1 * time.Second)
fakeRuntime.AssertCallCounts("PullImage", 5)
// Next two EnsureImageExists should be blocked because maxParallelImagePulls is hit
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil)
assert.Nil(t, err)
wg.Done()
}()
}
time.Sleep(1 * time.Second)
fakeRuntime.AssertCallCounts("PullImage", 5)
// Unblock two image pulls from runtime, and two EnsureImageExists can go through
fakeRuntime.UnblockImagePulls(2)
time.Sleep(1 * time.Second)
fakeRuntime.AssertCallCounts("PullImage", 7)
// Unblock the remaining 5 image pulls from runtime, and all EnsureImageExists can go through
fakeRuntime.UnblockImagePulls(5)
wg.Wait()
fakeRuntime.AssertCallCounts("PullImage", 7)
}

View File

@ -40,14 +40,22 @@ var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{}
type parallelImagePuller struct {
imageService kubecontainer.ImageService
tokens chan struct{}
}
func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller {
return &parallelImagePuller{imageService}
func newParallelImagePuller(imageService kubecontainer.ImageService, maxParallelImagePulls *int32) imagePuller {
if maxParallelImagePulls == nil || *maxParallelImagePulls < 1 {
return &parallelImagePuller{imageService, nil}
}
return &parallelImagePuller{imageService, make(chan struct{}, *maxParallelImagePulls)}
}
func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
go func() {
if pip.tokens != nil {
pip.tokens <- struct{}{}
defer func() { <-pip.tokens }()
}
startTime := time.Now()
imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig)
pullChan <- pullResult{

View File

@ -660,6 +660,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
insecureContainerLifecycleHTTPClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
kubeCfg.MaxParallelImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
utilpointer "k8s.io/utils/pointer"
)
const (
@ -129,6 +130,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
kubeRuntimeManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
utilpointer.Int32Ptr(0), // No limit on max parallel image pulls,
0, // Disable image pull throttling by setting QPS to 0,
0,
&fakePodPullingTimeRecorder{},

View File

@ -188,6 +188,7 @@ func NewKubeGenericRuntimeManager(
insecureContainerLifecycleHTTPClient types.HTTPDoer,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
maxParallelImagePulls *int32,
imagePullQPS float32,
imagePullBurst int,
imageCredentialProviderConfigFile string,
@ -275,6 +276,7 @@ func NewKubeGenericRuntimeManager(
kubeRuntimeManager,
imageBackOff,
serializeImagePulls,
maxParallelImagePulls,
imagePullQPS,
imagePullBurst,
podPullingTimeRecorder)

View File

@ -482,6 +482,12 @@ type KubeletConfiguration struct {
// Default: true
// +optional
SerializeImagePulls *bool `json:"serializeImagePulls,omitempty"`
// MaxParallelImagePulls sets the maximum number of image pulls in parallel.
// This field cannot be set if SerializeImagePulls is true.
// Setting it to nil means no limit.
// Default: nil
// +optional
MaxParallelImagePulls *int32 `json:"maxParallelImagePulls,omitempty"`
// evictionHard is a map of signal names to quantities that defines hard eviction
// thresholds. For example: `{"memory.available": "300Mi"}`.
// To explicitly disable, pass a 0% or 100% threshold on an arbitrary resource.

View File

@ -311,6 +311,11 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = new(bool)
**out = **in
}
if in.MaxParallelImagePulls != nil {
in, out := &in.MaxParallelImagePulls, &out.MaxParallelImagePulls
*out = new(int32)
**out = **in
}
if in.EvictionHard != nil {
in, out := &in.EvictionHard, &out.EvictionHard
*out = make(map[string]string, len(*in))