diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 5bff3472ebe..b0f97cede2f 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -57699,6 +57699,13 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen Format: "", }, }, + "maxParallelImagePulls": { + SchemaProps: spec.SchemaProps{ + Description: "MaxParallelImagePulls sets the maximum number of image pulls in parallel. This field cannot be set if SerializeImagePulls is true. Setting it to nil means no limit. Default: nil", + Type: []string{"integer"}, + Format: "int32", + }, + }, "evictionHard": { SchemaProps: spec.SchemaProps{ Description: "evictionHard is a map of signal names to quantities that defines hard eviction thresholds. For example: `{\"memory.available\": \"300Mi\"}`. To explicitly disable, pass a 0% or 100% threshold on an arbitrary resource. Default:\n memory.available: \"100Mi\"\n nodefs.available: \"10%\"\n nodefs.inodesFree: \"5%\"\n imagefs.available: \"15%\"", diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index d0e42167091..6c6a5eca91b 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -266,6 +266,7 @@ var ( "RunOnce", "SeccompDefault", "SerializeImagePulls", + "MaxParallelImagePulls", "ShowHiddenMetricsForVersion", "ShutdownGracePeriodByPodPriority[*].Priority", "ShutdownGracePeriodByPodPriority[*].ShutdownGracePeriodSeconds", diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index ce1e279c068..526d89eb6e1 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -292,6 +292,8 @@ type KubeletConfiguration struct { KubeAPIBurst int32 // serializeImagePulls when enabled, tells the Kubelet to pull images one at a time. SerializeImagePulls bool + // MaxParallelImagePulls sets the maximum number of image pulls in parallel. + MaxParallelImagePulls *int32 // Map of signal names to quantities that defines hard eviction thresholds. For example: {"memory.available": "300Mi"}. // Some default signals are Linux only: nodefs.inodesFree EvictionHard map[string]string diff --git a/pkg/kubelet/apis/config/v1beta1/defaults.go b/pkg/kubelet/apis/config/v1beta1/defaults.go index a90386e7a81..37f714b2227 100644 --- a/pkg/kubelet/apis/config/v1beta1/defaults.go +++ b/pkg/kubelet/apis/config/v1beta1/defaults.go @@ -206,7 +206,14 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura obj.KubeAPIBurst = 10 } if obj.SerializeImagePulls == nil { - obj.SerializeImagePulls = utilpointer.Bool(true) + // SerializeImagePulls is default to true when MaxParallelImagePulls + // is not set, and false when MaxParallelImagePulls is set. + // This is to save users from having to set both configs. + if obj.MaxParallelImagePulls == nil || *obj.MaxParallelImagePulls < 2 { + obj.SerializeImagePulls = utilpointer.Bool(true) + } else { + obj.SerializeImagePulls = utilpointer.Bool(false) + } } if obj.EvictionPressureTransitionPeriod == zeroDuration { obj.EvictionPressureTransitionPeriod = metav1.Duration{Duration: 5 * time.Minute} diff --git a/pkg/kubelet/apis/config/v1beta1/defaults_test.go b/pkg/kubelet/apis/config/v1beta1/defaults_test.go index 079869a0721..7cbc31bb36f 100644 --- a/pkg/kubelet/apis/config/v1beta1/defaults_test.go +++ b/pkg/kubelet/apis/config/v1beta1/defaults_test.go @@ -100,6 +100,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(5), KubeAPIBurst: 10, SerializeImagePulls: utilpointer.Bool(true), + MaxParallelImagePulls: nil, EvictionHard: nil, EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, EnableControllerAttachDetach: utilpointer.Bool(true), @@ -206,6 +207,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(0), KubeAPIBurst: 0, SerializeImagePulls: utilpointer.Bool(false), + MaxParallelImagePulls: nil, EvictionHard: map[string]string{}, EvictionSoft: map[string]string{}, EvictionSoftGracePeriod: map[string]string{}, @@ -314,6 +316,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(0), KubeAPIBurst: 10, SerializeImagePulls: utilpointer.Bool(false), + MaxParallelImagePulls: nil, EvictionHard: map[string]string{}, EvictionSoft: map[string]string{}, EvictionSoftGracePeriod: map[string]string{}, @@ -429,6 +432,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(1), KubeAPIBurst: 1, SerializeImagePulls: utilpointer.Bool(true), + MaxParallelImagePulls: utilpointer.Int32(5), EvictionHard: map[string]string{ "memory.available": "1Mi", "nodefs.available": "1%", @@ -574,6 +578,7 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(1), KubeAPIBurst: 1, SerializeImagePulls: utilpointer.Bool(true), + MaxParallelImagePulls: utilpointer.Int32Ptr(5), EvictionHard: map[string]string{ "memory.available": "1Mi", "nodefs.available": "1%", @@ -704,6 +709,185 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { KubeAPIQPS: utilpointer.Int32(5), KubeAPIBurst: 10, SerializeImagePulls: utilpointer.Bool(true), + MaxParallelImagePulls: nil, + EvictionHard: nil, + EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, + EnableControllerAttachDetach: utilpointer.Bool(true), + MakeIPTablesUtilChains: utilpointer.Bool(true), + IPTablesMasqueradeBit: utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit), + IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit), + FailSwapOn: utilpointer.Bool(true), + ContainerLogMaxSize: "10Mi", + ContainerLogMaxFiles: utilpointer.Int32Ptr(5), + ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, + EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, + VolumePluginDir: DefaultVolumePluginDir, + Logging: logsapi.LoggingConfiguration{ + Format: "text", + FlushFrequency: 5 * time.Second, + }, + EnableSystemLogHandler: utilpointer.Bool(true), + EnableProfilingHandler: utilpointer.Bool(true), + EnableDebugFlagsHandler: utilpointer.Bool(true), + SeccompDefault: utilpointer.Bool(false), + MemoryThrottlingFactor: utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor), + RegisterNode: utilpointer.Bool(true), + LocalStorageCapacityIsolation: utilpointer.Bool(true), + }, + }, + { + "SerializeImagePull defaults to false when MaxParallelImagePulls is larger than 1", + &v1beta1.KubeletConfiguration{ + MaxParallelImagePulls: utilpointer.Int32(5), + }, + &v1beta1.KubeletConfiguration{ + EnableServer: utilpointer.Bool(true), + SyncFrequency: metav1.Duration{Duration: 1 * time.Minute}, + FileCheckFrequency: metav1.Duration{Duration: 20 * time.Second}, + HTTPCheckFrequency: metav1.Duration{Duration: 20 * time.Second}, + Address: "0.0.0.0", + Port: ports.KubeletPort, + Authentication: v1beta1.KubeletAuthentication{ + Anonymous: v1beta1.KubeletAnonymousAuthentication{Enabled: utilpointer.Bool(false)}, + Webhook: v1beta1.KubeletWebhookAuthentication{ + Enabled: utilpointer.Bool(true), + CacheTTL: metav1.Duration{Duration: 2 * time.Minute}, + }, + }, + Authorization: v1beta1.KubeletAuthorization{ + Mode: v1beta1.KubeletAuthorizationModeWebhook, + Webhook: v1beta1.KubeletWebhookAuthorization{ + CacheAuthorizedTTL: metav1.Duration{Duration: 5 * time.Minute}, + CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second}, + }, + }, + RegistryPullQPS: utilpointer.Int32Ptr(5), + RegistryBurst: 10, + EventRecordQPS: utilpointer.Int32Ptr(5), + EventBurst: 10, + EnableDebuggingHandlers: utilpointer.Bool(true), + HealthzPort: utilpointer.Int32Ptr(10248), + HealthzBindAddress: "127.0.0.1", + OOMScoreAdj: utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj)), + StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour}, + NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second}, + NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute}, + NodeLeaseDurationSeconds: 40, + ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", + ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute}, + ImageGCHighThresholdPercent: utilpointer.Int32Ptr(85), + ImageGCLowThresholdPercent: utilpointer.Int32Ptr(80), + VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute}, + CgroupsPerQOS: utilpointer.Bool(true), + CgroupDriver: "cgroupfs", + CPUManagerPolicy: "none", + CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second}, + MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy, + TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy, + TopologyManagerScope: v1beta1.ContainerTopologyManagerScope, + RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute}, + HairpinMode: v1beta1.PromiscuousBridge, + MaxPods: 110, + PodPidsLimit: utilpointer.Int64(-1), + ResolverConfig: utilpointer.String(kubetypes.ResolvConfDefault), + CPUCFSQuota: utilpointer.Bool(true), + CPUCFSQuotaPeriod: &metav1.Duration{Duration: 100 * time.Millisecond}, + NodeStatusMaxImages: utilpointer.Int32Ptr(50), + MaxOpenFiles: 1000000, + ContentType: "application/vnd.kubernetes.protobuf", + KubeAPIQPS: utilpointer.Int32Ptr(5), + KubeAPIBurst: 10, + SerializeImagePulls: utilpointer.Bool(false), + MaxParallelImagePulls: utilpointer.Int32(5), + EvictionHard: nil, + EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, + EnableControllerAttachDetach: utilpointer.Bool(true), + MakeIPTablesUtilChains: utilpointer.Bool(true), + IPTablesMasqueradeBit: utilpointer.Int32Ptr(DefaultIPTablesMasqueradeBit), + IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit), + FailSwapOn: utilpointer.Bool(true), + ContainerLogMaxSize: "10Mi", + ContainerLogMaxFiles: utilpointer.Int32Ptr(5), + ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, + EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, + VolumePluginDir: DefaultVolumePluginDir, + Logging: logsapi.LoggingConfiguration{ + Format: "text", + FlushFrequency: 5 * time.Second, + }, + EnableSystemLogHandler: utilpointer.Bool(true), + EnableProfilingHandler: utilpointer.Bool(true), + EnableDebugFlagsHandler: utilpointer.Bool(true), + SeccompDefault: utilpointer.Bool(false), + MemoryThrottlingFactor: utilpointer.Float64Ptr(DefaultMemoryThrottlingFactor), + RegisterNode: utilpointer.Bool(true), + LocalStorageCapacityIsolation: utilpointer.Bool(true), + }, + }, + { + "SerializeImagePull defaults to true when MaxParallelImagePulls is set to 1", + &v1beta1.KubeletConfiguration{ + MaxParallelImagePulls: utilpointer.Int32(1), + }, + &v1beta1.KubeletConfiguration{ + EnableServer: utilpointer.Bool(true), + SyncFrequency: metav1.Duration{Duration: 1 * time.Minute}, + FileCheckFrequency: metav1.Duration{Duration: 20 * time.Second}, + HTTPCheckFrequency: metav1.Duration{Duration: 20 * time.Second}, + Address: "0.0.0.0", + Port: ports.KubeletPort, + Authentication: v1beta1.KubeletAuthentication{ + Anonymous: v1beta1.KubeletAnonymousAuthentication{Enabled: utilpointer.Bool(false)}, + Webhook: v1beta1.KubeletWebhookAuthentication{ + Enabled: utilpointer.Bool(true), + CacheTTL: metav1.Duration{Duration: 2 * time.Minute}, + }, + }, + Authorization: v1beta1.KubeletAuthorization{ + Mode: v1beta1.KubeletAuthorizationModeWebhook, + Webhook: v1beta1.KubeletWebhookAuthorization{ + CacheAuthorizedTTL: metav1.Duration{Duration: 5 * time.Minute}, + CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second}, + }, + }, + RegistryPullQPS: utilpointer.Int32Ptr(5), + RegistryBurst: 10, + EventRecordQPS: utilpointer.Int32Ptr(5), + EventBurst: 10, + EnableDebuggingHandlers: utilpointer.Bool(true), + HealthzPort: utilpointer.Int32Ptr(10248), + HealthzBindAddress: "127.0.0.1", + OOMScoreAdj: utilpointer.Int32Ptr(int32(qos.KubeletOOMScoreAdj)), + StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour}, + NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second}, + NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute}, + NodeLeaseDurationSeconds: 40, + ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", + ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute}, + ImageGCHighThresholdPercent: utilpointer.Int32Ptr(85), + ImageGCLowThresholdPercent: utilpointer.Int32Ptr(80), + VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute}, + CgroupsPerQOS: utilpointer.Bool(true), + CgroupDriver: "cgroupfs", + CPUManagerPolicy: "none", + CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second}, + MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy, + TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy, + TopologyManagerScope: v1beta1.ContainerTopologyManagerScope, + RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute}, + HairpinMode: v1beta1.PromiscuousBridge, + MaxPods: 110, + PodPidsLimit: utilpointer.Int64(-1), + ResolverConfig: utilpointer.String(kubetypes.ResolvConfDefault), + CPUCFSQuota: utilpointer.Bool(true), + CPUCFSQuotaPeriod: &metav1.Duration{Duration: 100 * time.Millisecond}, + NodeStatusMaxImages: utilpointer.Int32Ptr(50), + MaxOpenFiles: 1000000, + ContentType: "application/vnd.kubernetes.protobuf", + KubeAPIQPS: utilpointer.Int32Ptr(5), + KubeAPIBurst: 10, + SerializeImagePulls: utilpointer.Bool(true), + MaxParallelImagePulls: utilpointer.Int32(1), EvictionHard: nil, EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, EnableControllerAttachDetach: utilpointer.Bool(true), diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index 43f08cc63c2..6d8162dbdbe 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -443,6 +443,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in if err := v1.Convert_Pointer_bool_To_bool(&in.SerializeImagePulls, &out.SerializeImagePulls, s); err != nil { return err } + out.MaxParallelImagePulls = (*int32)(unsafe.Pointer(in.MaxParallelImagePulls)) out.EvictionHard = *(*map[string]string)(unsafe.Pointer(&in.EvictionHard)) out.EvictionSoft = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoft)) out.EvictionSoftGracePeriod = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoftGracePeriod)) @@ -626,6 +627,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in if err := v1.Convert_bool_To_Pointer_bool(&in.SerializeImagePulls, &out.SerializeImagePulls, s); err != nil { return err } + out.MaxParallelImagePulls = (*int32)(unsafe.Pointer(in.MaxParallelImagePulls)) out.EvictionHard = *(*map[string]string)(unsafe.Pointer(&in.EvictionHard)) out.EvictionSoft = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoft)) out.EvictionSoftGracePeriod = *(*map[string]string)(unsafe.Pointer(&in.EvictionSoftGracePeriod)) diff --git a/pkg/kubelet/apis/config/validation/validation.go b/pkg/kubelet/apis/config/validation/validation.go index aa4c2c5fcfa..1da3b060b1a 100644 --- a/pkg/kubelet/apis/config/validation/validation.go +++ b/pkg/kubelet/apis/config/validation/validation.go @@ -122,6 +122,12 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur if kc.RegistryPullQPS < 0 { allErrors = append(allErrors, fmt.Errorf("invalid configuration: registryPullQPS (--registry-qps) %v must not be a negative number", kc.RegistryPullQPS)) } + if kc.MaxParallelImagePulls != nil && *kc.MaxParallelImagePulls < 1 { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: maxParallelImagePulls %v must be a positive number", *kc.MaxParallelImagePulls)) + } + if kc.SerializeImagePulls && kc.MaxParallelImagePulls != nil && *kc.MaxParallelImagePulls > 1 { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: maxParallelImagePulls cannot be larger than 1 unless SerializeImagePulls (--serialize-image-pulls) is set to false")) + } if kc.ServerTLSBootstrap && !localFeatureGate.Enabled(features.RotateKubeletServerCertificate) { allErrors = append(allErrors, fmt.Errorf("invalid configuration: serverTLSBootstrap %v requires feature gate RotateKubeletServerCertificate", kc.ServerTLSBootstrap)) } diff --git a/pkg/kubelet/apis/config/validation/validation_test.go b/pkg/kubelet/apis/config/validation/validation_test.go index 3df8179e216..db86577b6b5 100644 --- a/pkg/kubelet/apis/config/validation/validation_test.go +++ b/pkg/kubelet/apis/config/validation/validation_test.go @@ -57,6 +57,7 @@ var ( ReadOnlyPort: 0, RegistryBurst: 10, RegistryPullQPS: 5, + MaxParallelImagePulls: nil, HairpinMode: kubeletconfig.PromiscuousBridge, NodeLeaseDurationSeconds: 1, CPUCFSQuotaPeriod: metav1.Duration{Duration: 25 * time.Millisecond}, @@ -298,6 +299,31 @@ func TestValidateKubeletConfiguration(t *testing.T) { }, errMsg: "invalid configuration: registryPullQPS (--registry-qps) -1 must not be a negative number", }, + { + name: "invalid MaxParallelImagePulls", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.MaxParallelImagePulls = utilpointer.Int32(0) + return conf + }, + errMsg: "invalid configuration: maxParallelImagePulls 0 must be a positive number", + }, + { + name: "invalid MaxParallelImagePulls and SerializeImagePulls combination", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.MaxParallelImagePulls = utilpointer.Int32(3) + conf.SerializeImagePulls = true + return conf + }, + errMsg: "invalid configuration: maxParallelImagePulls cannot be larger than 1 unless SerializeImagePulls (--serialize-image-pulls) is set to false", + }, + { + name: "valid MaxParallelImagePulls and SerializeImagePulls combination", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.MaxParallelImagePulls = utilpointer.Int32(1) + conf.SerializeImagePulls = true + return conf + }, + }, { name: "specify ServerTLSBootstrap without enabling RotateKubeletServerCertificate", configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { diff --git a/pkg/kubelet/apis/config/zz_generated.deepcopy.go b/pkg/kubelet/apis/config/zz_generated.deepcopy.go index 9436182f325..a4af47e4a83 100644 --- a/pkg/kubelet/apis/config/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/config/zz_generated.deepcopy.go @@ -227,6 +227,11 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { } out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.CPUCFSQuotaPeriod = in.CPUCFSQuotaPeriod + if in.MaxParallelImagePulls != nil { + in, out := &in.MaxParallelImagePulls, &out.MaxParallelImagePulls + *out = new(int32) + **out = **in + } if in.EvictionHard != nil { in, out := &in.EvictionHard, &out.EvictionHard *out = make(map[string]string, len(*in)) diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 1d13f25a37b..3b89f5c6560 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -58,7 +58,12 @@ type FakeRuntime struct { Err error InspectErr error StatusErr error - T *testing.T + // If BlockImagePulls is true, then all PullImage() calls will be blocked until + // UnblockImagePulls() is called. This is used to simulate image pull latency + // from container runtime. + BlockImagePulls bool + imagePullTokenBucket chan bool + T *testing.T } const FakeHost = "localhost:12345" @@ -129,6 +134,17 @@ func (f *FakeRuntime) ClearCalls() { f.Err = nil f.InspectErr = nil f.StatusErr = nil + f.BlockImagePulls = false + if f.imagePullTokenBucket != nil { + for { + select { + case f.imagePullTokenBucket <- true: + default: + f.imagePullTokenBucket = nil + return + } + } + } } // UpdatePodCIDR fulfills the cri interface. @@ -151,6 +167,23 @@ func (f *FakeRuntime) AssertCalls(calls []string) bool { return f.assertList(calls, f.CalledFunctions) } +// AssertCallCounts checks if a certain call is called for a certain of numbers +func (f *FakeRuntime) AssertCallCounts(funcName string, expectedCount int) bool { + f.Lock() + defer f.Unlock() + actualCount := 0 + for _, c := range f.CalledFunctions { + if funcName == c { + actualCount += 1 + } + } + if expectedCount != actualCount { + f.T.Errorf("AssertCallCounts: expected %s to be called %d times, but was actually called %d times.", funcName, expectedCount, actualCount) + return false + } + return true +} + func (f *FakeRuntime) AssertStartedPods(pods []string) bool { f.Lock() defer f.Unlock() @@ -302,10 +335,8 @@ func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, container return f.Err } -func (f *FakeRuntime) PullImage(_ context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) { +func (f *FakeRuntime) PullImage(ctx context.Context, image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) { f.Lock() - defer f.Unlock() - f.CalledFunctions = append(f.CalledFunctions, "PullImage") if f.Err == nil { i := kubecontainer.Image{ @@ -314,7 +345,35 @@ func (f *FakeRuntime) PullImage(_ context.Context, image kubecontainer.ImageSpec } f.ImageList = append(f.ImageList, i) } - return image.Image, f.Err + + if !f.BlockImagePulls { + f.Unlock() + return image.Image, f.Err + } + + retErr := f.Err + if f.imagePullTokenBucket == nil { + f.imagePullTokenBucket = make(chan bool, 1) + } + // Unlock before waiting for UnblockImagePulls calls, to avoid deadlock. + f.Unlock() + select { + case <-ctx.Done(): + case <-f.imagePullTokenBucket: + } + return image.Image, retErr +} + +// UnblockImagePulls unblocks a certain number of image pulls, if BlockImagePulls is true. +func (f *FakeRuntime) UnblockImagePulls(count int) { + if f.imagePullTokenBucket != nil { + for i := 0; i < count; i++ { + select { + case f.imagePullTokenBucket <- true: + default: + } + } + } } func (f *FakeRuntime) GetImageRef(_ context.Context, image kubecontainer.ImageSpec) (string, error) { diff --git a/pkg/kubelet/images/image_manager.go b/pkg/kubelet/images/image_manager.go index 3fbec1f2b56..3a46d4bd731 100644 --- a/pkg/kubelet/images/image_manager.go +++ b/pkg/kubelet/images/image_manager.go @@ -52,14 +52,14 @@ type imageManager struct { var _ ImageManager = &imageManager{} // NewImageManager instantiates a new ImageManager object. -func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager { +func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager { imageService = throttleImagePulling(imageService, qps, burst) var puller imagePuller if serialized { puller = newSerialImagePuller(imageService) } else { - puller = newParallelImagePuller(imageService) + puller = newParallelImagePuller(imageService, maxParallelImagePulls) } return &imageManager{ recorder: recorder, diff --git a/pkg/kubelet/images/image_manager_test.go b/pkg/kubelet/images/image_manager_test.go index 3a6d14eb00b..ab0cb3e7946 100644 --- a/pkg/kubelet/images/image_manager_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -19,6 +19,7 @@ package images import ( "context" "errors" + "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( . "k8s.io/kubernetes/pkg/kubelet/container" ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" testingclock "k8s.io/utils/clock/testing" + utilpointer "k8s.io/utils/pointer" ) type pullerExpects struct { @@ -166,7 +168,7 @@ func (m *mockPodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) func (m *mockPodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {} -func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) { +func pullerTestEnv(c pullerTestCase, serialized bool, maxParallelImagePulls *int32) (puller ImageManager, fakeClock *testingclock.FakeClock, fakeRuntime *ctest.FakeRuntime, container *v1.Container) { container = &v1.Container{ Name: "container_name", Image: c.containerImage, @@ -184,7 +186,7 @@ func pullerTestEnv(c pullerTestCase, serialized bool) (puller ImageManager, fake fakeRuntime.Err = c.pullerErr fakeRuntime.InspectErr = c.inspectErr - puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, c.qps, c.burst, &mockPodPullingTimeRecorder{}) + puller = NewImageManager(fakeRecorder, fakeRuntime, backOff, serialized, maxParallelImagePulls, c.qps, c.burst, &mockPodPullingTimeRecorder{}) return } @@ -201,7 +203,7 @@ func TestParallelPuller(t *testing.T) { useSerializedEnv := false for _, c := range cases { - puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv) + puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil) t.Run(c.testName, func(t *testing.T) { ctx := context.Background() @@ -229,7 +231,7 @@ func TestSerializedPuller(t *testing.T) { useSerializedEnv := true for _, c := range cases { - puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv) + puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil) t.Run(c.testName, func(t *testing.T) { ctx := context.Background() @@ -287,7 +289,7 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) { }} useSerializedEnv := true - puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv) + puller, fakeClock, fakeRuntime, container := pullerTestEnv(c, useSerializedEnv, nil) fakeRuntime.CalledFunctions = nil fakeRuntime.ImageList = []Image{} fakeClock.Step(time.Second) @@ -312,3 +314,69 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) { assert.Equal(t, expectedAnnotations, image.Spec.Annotations, "image spec annotations") }) } + +func TestMaxParallelImagePullsLimit(t *testing.T) { + ctx := context.Background() + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_pod", + Namespace: "test-ns", + UID: "bar", + ResourceVersion: "42", + }} + + testCase := &pullerTestCase{ + containerImage: "present_image", + testName: "image present, pull ", + policy: v1.PullAlways, + inspectErr: nil, + pullerErr: nil, + qps: 0.0, + burst: 0, + } + + useSerializedEnv := false + maxParallelImagePulls := 5 + var wg sync.WaitGroup + + puller, fakeClock, fakeRuntime, container := pullerTestEnv(*testCase, useSerializedEnv, utilpointer.Int32Ptr(int32(maxParallelImagePulls))) + fakeRuntime.BlockImagePulls = true + fakeRuntime.CalledFunctions = nil + fakeRuntime.T = t + fakeClock.Step(time.Second) + + // First 5 EnsureImageExists should result in runtime calls + for i := 0; i < maxParallelImagePulls; i++ { + wg.Add(1) + go func() { + _, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil) + assert.Nil(t, err) + wg.Done() + }() + } + time.Sleep(1 * time.Second) + fakeRuntime.AssertCallCounts("PullImage", 5) + + // Next two EnsureImageExists should be blocked because maxParallelImagePulls is hit + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + _, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil) + assert.Nil(t, err) + wg.Done() + }() + } + time.Sleep(1 * time.Second) + fakeRuntime.AssertCallCounts("PullImage", 5) + + // Unblock two image pulls from runtime, and two EnsureImageExists can go through + fakeRuntime.UnblockImagePulls(2) + time.Sleep(1 * time.Second) + fakeRuntime.AssertCallCounts("PullImage", 7) + + // Unblock the remaining 5 image pulls from runtime, and all EnsureImageExists can go through + fakeRuntime.UnblockImagePulls(5) + + wg.Wait() + fakeRuntime.AssertCallCounts("PullImage", 7) +} diff --git a/pkg/kubelet/images/puller.go b/pkg/kubelet/images/puller.go index 2f0bda92cb0..fd5ec1920f3 100644 --- a/pkg/kubelet/images/puller.go +++ b/pkg/kubelet/images/puller.go @@ -40,14 +40,22 @@ var _, _ imagePuller = ¶llelImagePuller{}, &serialImagePuller{} type parallelImagePuller struct { imageService kubecontainer.ImageService + tokens chan struct{} } -func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller { - return ¶llelImagePuller{imageService} +func newParallelImagePuller(imageService kubecontainer.ImageService, maxParallelImagePulls *int32) imagePuller { + if maxParallelImagePulls == nil || *maxParallelImagePulls < 1 { + return ¶llelImagePuller{imageService, nil} + } + return ¶llelImagePuller{imageService, make(chan struct{}, *maxParallelImagePulls)} } func (pip *parallelImagePuller) pullImage(ctx context.Context, spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) { go func() { + if pip.tokens != nil { + pip.tokens <- struct{}{} + defer func() { <-pip.tokens }() + } startTime := time.Now() imageRef, err := pip.imageService.PullImage(ctx, spec, pullSecrets, podSandboxConfig) pullChan <- pullResult{ diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8a08a8c98dd..c9ac91b9f2a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -660,6 +660,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, insecureContainerLifecycleHTTPClient, imageBackOff, kubeCfg.SerializeImagePulls, + kubeCfg.MaxParallelImagePulls, float32(kubeCfg.RegistryPullQPS), int(kubeCfg.RegistryBurst), imageCredentialProviderConfigFile, diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index c1c14e4f530..30a340c0952 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + utilpointer "k8s.io/utils/pointer" ) const ( @@ -129,7 +130,8 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS kubeRuntimeManager, flowcontrol.NewBackOff(time.Second, 300*time.Second), false, - 0, // Disable image pull throttling by setting QPS to 0, + utilpointer.Int32Ptr(0), // No limit on max parallel image pulls, + 0, // Disable image pull throttling by setting QPS to 0, 0, &fakePodPullingTimeRecorder{}, ) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 82eaa67e7a2..389e1b151d3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -188,6 +188,7 @@ func NewKubeGenericRuntimeManager( insecureContainerLifecycleHTTPClient types.HTTPDoer, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, + maxParallelImagePulls *int32, imagePullQPS float32, imagePullBurst int, imageCredentialProviderConfigFile string, @@ -275,6 +276,7 @@ func NewKubeGenericRuntimeManager( kubeRuntimeManager, imageBackOff, serializeImagePulls, + maxParallelImagePulls, imagePullQPS, imagePullBurst, podPullingTimeRecorder) diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index b8ef28d3484..0e80321791e 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -482,6 +482,12 @@ type KubeletConfiguration struct { // Default: true // +optional SerializeImagePulls *bool `json:"serializeImagePulls,omitempty"` + // MaxParallelImagePulls sets the maximum number of image pulls in parallel. + // This field cannot be set if SerializeImagePulls is true. + // Setting it to nil means no limit. + // Default: nil + // +optional + MaxParallelImagePulls *int32 `json:"maxParallelImagePulls,omitempty"` // evictionHard is a map of signal names to quantities that defines hard eviction // thresholds. For example: `{"memory.available": "300Mi"}`. // To explicitly disable, pass a 0% or 100% threshold on an arbitrary resource. diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go index d3a3ceca887..f5b2819e61f 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go @@ -311,6 +311,11 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { *out = new(bool) **out = **in } + if in.MaxParallelImagePulls != nil { + in, out := &in.MaxParallelImagePulls, &out.MaxParallelImagePulls + *out = new(int32) + **out = **in + } if in.EvictionHard != nil { in, out := &in.EvictionHard, &out.EvictionHard *out = make(map[string]string, len(*in))