diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 93454b6b9fb..66c3be09367 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -93,17 +93,6 @@ type Runtime interface { // GetPodStatus retrieves the status of the pod, including the // information of all containers in the pod that are visble in Runtime. GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error) - // PullImage pulls an image from the network to local storage using the supplied - // secrets if necessary. - PullImage(image ImageSpec, pullSecrets []api.Secret) error - // IsImagePresent checks whether the container image is already in the local storage. - IsImagePresent(image ImageSpec) (bool, error) - // Gets all images currently on the machine. - ListImages() ([]Image, error) - // Removes the specified image. - RemoveImage(image ImageSpec) error - // Returns Image statistics. - ImageStats() (*ImageStats, error) // Returns the filesystem path of the pod's network namespace; if the // runtime does not handle namespace creation itself, or cannot return // the network namespace path, it should return an error. @@ -127,6 +116,22 @@ type Runtime interface { ContainerCommandRunner // ContainerAttach encapsulates the attaching to containers for testability ContainerAttacher + // ImageService provides methods to image-related methods. + ImageService +} + +type ImageService interface { + // PullImage pulls an image from the network to local storage using the supplied + // secrets if necessary. + PullImage(image ImageSpec, pullSecrets []api.Secret) error + // IsImagePresent checks whether the container image is already in the local storage. + IsImagePresent(image ImageSpec) (bool, error) + // Gets all images currently on the machine. + ListImages() ([]Image, error) + // Removes the specified image. + RemoveImage(image ImageSpec) error + // Returns Image statistics. + ImageStats() (*ImageStats, error) } type ContainerAttacher interface { diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 42bda4dac61..a19c821b5e8 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -38,7 +38,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/leaky" "k8s.io/kubernetes/pkg/types" utilerrors "k8s.io/kubernetes/pkg/util/errors" - "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/parsers" ) @@ -114,25 +113,12 @@ type dockerPuller struct { keyring credentialprovider.DockerKeyring } -type throttledDockerPuller struct { - puller dockerPuller - limiter flowcontrol.RateLimiter -} - // newDockerPuller creates a new instance of the default implementation of DockerPuller. -func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { - dp := dockerPuller{ +func newDockerPuller(client DockerInterface) DockerPuller { + return &dockerPuller{ client: client, keyring: credentialprovider.NewDockerKeyring(), } - - if qps == 0.0 { - return dp - } - return &throttledDockerPuller{ - puller: dp, - limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst), - } } func filterHTTPError(err error, image string) error { @@ -285,13 +271,6 @@ func (p dockerPuller) Pull(image string, secrets []api.Secret) error { return utilerrors.NewAggregate(pullErrs) } -func (p throttledDockerPuller) Pull(image string, secrets []api.Secret) error { - if p.limiter.TryAccept() { - return p.puller.Pull(image, secrets) - } - return fmt.Errorf("pull QPS exceeded.") -} - func (p dockerPuller) IsImagePresent(image string) (bool, error) { _, err := p.client.InspectImage(image) if err == nil { @@ -303,10 +282,6 @@ func (p dockerPuller) IsImagePresent(image string) (bool, error) { return false, err } -func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) { - return p.puller.IsImagePresent(name) -} - // Creates a name which can be reversed to identify both full pod name and container name. // This function returns stable name, unique name and a unique id. // Although rand.Uint32() is not really unique, but it's enough for us because error will diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 89c7f12d203..d020dbb60ac 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -250,7 +250,7 @@ func NewDockerManager( os: osInterface, machineInfo: machineInfo, podInfraContainerImage: podInfraContainerImage, - dockerPuller: newDockerPuller(client, qps, burst), + dockerPuller: newDockerPuller(client), dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, @@ -266,7 +266,7 @@ func NewDockerManager( seccompProfileRoot: seccompProfileRoot, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls) + dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst) dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) dm.versionCache = cache.NewObjectCache( diff --git a/pkg/kubelet/images/helpers.go b/pkg/kubelet/images/helpers.go new file mode 100644 index 00000000000..f8fd5f1537e --- /dev/null +++ b/pkg/kubelet/images/helpers.go @@ -0,0 +1,50 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package images + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/flowcontrol" +) + +// throttleImagePulling wraps kubecontainer.ImageService to throttle image +// pulling based on the given QPS and burst limits. If QPS is zero, defaults +// to no throttling. +func throttleImagePulling(imageService kubecontainer.ImageService, qps float32, burst int) kubecontainer.ImageService { + if qps == 0.0 { + return imageService + } + return &throttledImageService{ + ImageService: imageService, + limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst), + } +} + +type throttledImageService struct { + kubecontainer.ImageService + limiter flowcontrol.RateLimiter +} + +func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []api.Secret) error { + if ts.limiter.TryAccept() { + return ts.ImageService.PullImage(image, secrets) + } + return fmt.Errorf("pull QPS exceeded.") +} diff --git a/pkg/kubelet/images/image_manager.go b/pkg/kubelet/images/image_manager.go index 5e17cb2c924..cf08f946cc8 100644 --- a/pkg/kubelet/images/image_manager.go +++ b/pkg/kubelet/images/image_manager.go @@ -29,27 +29,29 @@ import ( // imageManager provides the functionalities for image pulling. type imageManager struct { - recorder record.EventRecorder - runtime kubecontainer.Runtime - backOff *flowcontrol.Backoff + recorder record.EventRecorder + imageService kubecontainer.ImageService + backOff *flowcontrol.Backoff // It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly. puller imagePuller } var _ ImageManager = &imageManager{} -func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { +func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int) ImageManager { + imageService = throttleImagePulling(imageService, qps, burst) + var puller imagePuller if serialized { - puller = newSerialImagePuller(runtime) + puller = newSerialImagePuller(imageService) } else { - puller = newParallelImagePuller(runtime) + puller = newParallelImagePuller(imageService) } return &imageManager{ - recorder: recorder, - runtime: runtime, - backOff: imageBackOff, - puller: puller, + recorder: recorder, + imageService: imageService, + backOff: imageBackOff, + puller: puller, } } @@ -86,7 +88,7 @@ func (m *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, } spec := kubecontainer.ImageSpec{Image: container.Image} - present, err := m.runtime.IsImagePresent(spec) + present, err := m.imageService.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) m.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) diff --git a/pkg/kubelet/images/image_manager_test.go b/pkg/kubelet/images/image_manager_test.go index 697a8d05b57..f6a6073f773 100644 --- a/pkg/kubelet/images/image_manager_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -106,7 +106,7 @@ func TestParallelPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false) + puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false, 0, 0) fakeRuntime.ImageList = []Image{{ID: "present_image", Size: 1}} fakeRuntime.Err = c.pullerErr @@ -197,7 +197,7 @@ func TestSerializedPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true) + puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true, 0, 0) fakeRuntime.ImageList = []Image{{ID: "present_image"}} fakeRuntime.Err = c.pullerErr diff --git a/pkg/kubelet/images/puller.go b/pkg/kubelet/images/puller.go index 68a70ed0e83..7bcbd0bf9f5 100644 --- a/pkg/kubelet/images/puller.go +++ b/pkg/kubelet/images/puller.go @@ -31,16 +31,16 @@ type imagePuller interface { var _, _ imagePuller = ¶llelImagePuller{}, &serialImagePuller{} type parallelImagePuller struct { - runtime kubecontainer.Runtime + imageService kubecontainer.ImageService } -func newParallelImagePuller(runtime kubecontainer.Runtime) imagePuller { - return ¶llelImagePuller{runtime} +func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller { + return ¶llelImagePuller{imageService} } func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) { go func() { - errChan <- pip.runtime.PullImage(spec, pullSecrets) + errChan <- pip.imageService.PullImage(spec, pullSecrets) }() } @@ -48,12 +48,12 @@ func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecr const maxImagePullRequests = 10 type serialImagePuller struct { - runtime kubecontainer.Runtime + imageService kubecontainer.ImageService pullRequests chan *imagePullRequest } -func newSerialImagePuller(runtime kubecontainer.Runtime) imagePuller { - imagePuller := &serialImagePuller{runtime, make(chan *imagePullRequest, maxImagePullRequests)} +func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller { + imagePuller := &serialImagePuller{imageService, make(chan *imagePullRequest, maxImagePullRequests)} go wait.Until(imagePuller.processImagePullRequests, time.Second, wait.NeverStop) return imagePuller } @@ -74,6 +74,6 @@ func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecret func (sip *serialImagePuller) processImagePullRequests() { for pullRequest := range sip.pullRequests { - pullRequest.errChan <- sip.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) + pullRequest.errChan <- sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5d1286e4170..fba47cc19e4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -561,6 +561,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.httpClient, imageBackOff, kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), klet.cpuCFSQuota, dockerService, dockerService, @@ -625,6 +627,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub kubecontainer.RealOS{}, imageBackOff, kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), kubeCfg.RuntimeRequestTimeout.Duration, ) if err != nil { @@ -651,6 +655,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.httpClient, imageBackOff, kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), klet.cpuCFSQuota, remoteRuntimeService, remoteImageService, diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index 3e026d9ab08..67cb6103d0d 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -116,7 +116,10 @@ func NewFakeKubeRuntimeManager(runtimeService internalApi.RuntimeService, imageS kubecontainer.FilterEventRecorder(recorder), kubeRuntimeManager, flowcontrol.NewBackOff(time.Second, 300*time.Second), - false) + false, + 0, // Disable image pull throttling by setting QPS to 0, + 0, + ) kubeRuntimeManager.runner = lifecycle.NewHandlerRunner( &fakeHTTP{}, kubeRuntimeManager, diff --git a/pkg/kubelet/kuberuntime/kuberuntime_image.go b/pkg/kubelet/kuberuntime/kuberuntime_image.go index fdf4c21101b..d093c8bbc97 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_image.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_image.go @@ -28,7 +28,6 @@ import ( // PullImage pulls an image from the network to local storage using the supplied // secrets if necessary. -// TODO: pull image with qps and burst, ref https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L120 func (m *kubeGenericRuntimeManager) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error { img := image.Image repoToPull, _, _, err := parsers.ParseImageName(img) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 8a9bb7f7d79..855e6b440af 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -109,6 +109,8 @@ func NewKubeGenericRuntimeManager( httpClient types.HttpGetter, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, + imagePullQPS float32, + imagePullBurst int, cpuCFSQuota bool, runtimeService internalApi.RuntimeService, imageService internalApi.ImageManagerService, @@ -160,7 +162,9 @@ func NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(recorder), kubeRuntimeManager, imageBackOff, - serializeImagePulls) + serializeImagePulls, + imagePullQPS, + imagePullBurst) kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager) kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podGetter, kubeRuntimeManager) diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 7c0ce8056e3..768c5baf635 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -211,6 +211,8 @@ func New( os kubecontainer.OSInterface, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, + imagePullQPS float32, + imagePullBurst int, requestTimeout time.Duration, ) (*Runtime, error) { // Create dbus connection. @@ -275,7 +277,7 @@ func New( rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt) - rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls) + rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls, imagePullQPS, imagePullBurst) if err := rkt.getVersions(); err != nil { return nil, fmt.Errorf("rkt: error getting version info: %v", err)