Merge pull request #33228 from yujuhong/pull_throttling

Automatic merge from submit-queue

Move image pull throttling logic to pkg/kubelet/images

This is part of #31458

This allows runtimes in different packages (dockertools, rkt, kuberuntime) to
share the same logic. Before this change, only dockertools support this
feature. Now all three packages support image pull throttling.

/cc @kubernetes/sig-node
This commit is contained in:
Kubernetes Submit Queue 2016-09-22 06:23:39 -07:00 committed by GitHub
commit b60df6c312
12 changed files with 111 additions and 65 deletions

View File

@ -93,17 +93,6 @@ type Runtime interface {
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visble in Runtime.
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
PullImage(image ImageSpec, pullSecrets []api.Secret) error
// IsImagePresent checks whether the container image is already in the local storage.
IsImagePresent(image ImageSpec) (bool, error)
// Gets all images currently on the machine.
ListImages() ([]Image, error)
// Removes the specified image.
RemoveImage(image ImageSpec) error
// Returns Image statistics.
ImageStats() (*ImageStats, error)
// Returns the filesystem path of the pod's network namespace; if the
// runtime does not handle namespace creation itself, or cannot return
// the network namespace path, it should return an error.
@ -127,6 +116,22 @@ type Runtime interface {
ContainerCommandRunner
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
// ImageService provides methods to image-related methods.
ImageService
}
type ImageService interface {
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
PullImage(image ImageSpec, pullSecrets []api.Secret) error
// IsImagePresent checks whether the container image is already in the local storage.
IsImagePresent(image ImageSpec) (bool, error)
// Gets all images currently on the machine.
ListImages() ([]Image, error)
// Removes the specified image.
RemoveImage(image ImageSpec) error
// Returns Image statistics.
ImageStats() (*ImageStats, error)
}
type ContainerAttacher interface {

View File

@ -38,7 +38,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/parsers"
)
@ -114,25 +113,12 @@ type dockerPuller struct {
keyring credentialprovider.DockerKeyring
}
type throttledDockerPuller struct {
puller dockerPuller
limiter flowcontrol.RateLimiter
}
// newDockerPuller creates a new instance of the default implementation of DockerPuller.
func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
dp := dockerPuller{
func newDockerPuller(client DockerInterface) DockerPuller {
return &dockerPuller{
client: client,
keyring: credentialprovider.NewDockerKeyring(),
}
if qps == 0.0 {
return dp
}
return &throttledDockerPuller{
puller: dp,
limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
}
}
func filterHTTPError(err error, image string) error {
@ -285,13 +271,6 @@ func (p dockerPuller) Pull(image string, secrets []api.Secret) error {
return utilerrors.NewAggregate(pullErrs)
}
func (p throttledDockerPuller) Pull(image string, secrets []api.Secret) error {
if p.limiter.TryAccept() {
return p.puller.Pull(image, secrets)
}
return fmt.Errorf("pull QPS exceeded.")
}
func (p dockerPuller) IsImagePresent(image string) (bool, error) {
_, err := p.client.InspectImage(image)
if err == nil {
@ -303,10 +282,6 @@ func (p dockerPuller) IsImagePresent(image string) (bool, error) {
return false, err
}
func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) {
return p.puller.IsImagePresent(name)
}
// Creates a name which can be reversed to identify both full pod name and container name.
// This function returns stable name, unique name and a unique id.
// Although rand.Uint32() is not really unique, but it's enough for us because error will

View File

@ -250,7 +250,7 @@ func NewDockerManager(
os: osInterface,
machineInfo: machineInfo,
podInfraContainerImage: podInfraContainerImage,
dockerPuller: newDockerPuller(client, qps, burst),
dockerPuller: newDockerPuller(client),
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
@ -266,7 +266,7 @@ func NewDockerManager(
seccompProfileRoot: seccompProfileRoot,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls)
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst)
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)
dm.versionCache = cache.NewObjectCache(

View File

@ -0,0 +1,50 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
// throttleImagePulling wraps kubecontainer.ImageService to throttle image
// pulling based on the given QPS and burst limits. If QPS is zero, defaults
// to no throttling.
func throttleImagePulling(imageService kubecontainer.ImageService, qps float32, burst int) kubecontainer.ImageService {
if qps == 0.0 {
return imageService
}
return &throttledImageService{
ImageService: imageService,
limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
}
}
type throttledImageService struct {
kubecontainer.ImageService
limiter flowcontrol.RateLimiter
}
func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []api.Secret) error {
if ts.limiter.TryAccept() {
return ts.ImageService.PullImage(image, secrets)
}
return fmt.Errorf("pull QPS exceeded.")
}

View File

@ -29,27 +29,29 @@ import (
// imageManager provides the functionalities for image pulling.
type imageManager struct {
recorder record.EventRecorder
runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff
recorder record.EventRecorder
imageService kubecontainer.ImageService
backOff *flowcontrol.Backoff
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
puller imagePuller
}
var _ ImageManager = &imageManager{}
func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager {
func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, qps float32, burst int) ImageManager {
imageService = throttleImagePulling(imageService, qps, burst)
var puller imagePuller
if serialized {
puller = newSerialImagePuller(runtime)
puller = newSerialImagePuller(imageService)
} else {
puller = newParallelImagePuller(runtime)
puller = newParallelImagePuller(imageService)
}
return &imageManager{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
puller: puller,
recorder: recorder,
imageService: imageService,
backOff: imageBackOff,
puller: puller,
}
}
@ -86,7 +88,7 @@ func (m *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container,
}
spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := m.runtime.IsImagePresent(spec)
present, err := m.imageService.IsImagePresent(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
m.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)

View File

@ -106,7 +106,7 @@ func TestParallelPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false)
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false, 0, 0)
fakeRuntime.ImageList = []Image{{ID: "present_image", Size: 1}}
fakeRuntime.Err = c.pullerErr
@ -197,7 +197,7 @@ func TestSerializedPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true)
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true, 0, 0)
fakeRuntime.ImageList = []Image{{ID: "present_image"}}
fakeRuntime.Err = c.pullerErr

View File

@ -31,16 +31,16 @@ type imagePuller interface {
var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{}
type parallelImagePuller struct {
runtime kubecontainer.Runtime
imageService kubecontainer.ImageService
}
func newParallelImagePuller(runtime kubecontainer.Runtime) imagePuller {
return &parallelImagePuller{runtime}
func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller {
return &parallelImagePuller{imageService}
}
func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) {
go func() {
errChan <- pip.runtime.PullImage(spec, pullSecrets)
errChan <- pip.imageService.PullImage(spec, pullSecrets)
}()
}
@ -48,12 +48,12 @@ func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecr
const maxImagePullRequests = 10
type serialImagePuller struct {
runtime kubecontainer.Runtime
imageService kubecontainer.ImageService
pullRequests chan *imagePullRequest
}
func newSerialImagePuller(runtime kubecontainer.Runtime) imagePuller {
imagePuller := &serialImagePuller{runtime, make(chan *imagePullRequest, maxImagePullRequests)}
func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller {
imagePuller := &serialImagePuller{imageService, make(chan *imagePullRequest, maxImagePullRequests)}
go wait.Until(imagePuller.processImagePullRequests, time.Second, wait.NeverStop)
return imagePuller
}
@ -74,6 +74,6 @@ func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecret
func (sip *serialImagePuller) processImagePullRequests() {
for pullRequest := range sip.pullRequests {
pullRequest.errChan <- sip.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
pullRequest.errChan <- sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets)
}
}

View File

@ -561,6 +561,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
klet.cpuCFSQuota,
dockerService,
dockerService,
@ -625,6 +627,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
kubecontainer.RealOS{},
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.RuntimeRequestTimeout.Duration,
)
if err != nil {
@ -651,6 +655,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
klet.cpuCFSQuota,
remoteRuntimeService,
remoteImageService,

View File

@ -116,7 +116,10 @@ func NewFakeKubeRuntimeManager(runtimeService internalApi.RuntimeService, imageS
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false)
false,
0, // Disable image pull throttling by setting QPS to 0,
0,
)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(
&fakeHTTP{},
kubeRuntimeManager,

View File

@ -28,7 +28,6 @@ import (
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
// TODO: pull image with qps and burst, ref https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L120
func (m *kubeGenericRuntimeManager) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error {
img := image.Image
repoToPull, _, _, err := parsers.ParseImageName(img)

View File

@ -109,6 +109,8 @@ func NewKubeGenericRuntimeManager(
httpClient types.HttpGetter,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
imagePullBurst int,
cpuCFSQuota bool,
runtimeService internalApi.RuntimeService,
imageService internalApi.ImageManagerService,
@ -160,7 +162,9 @@ func NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
imageBackOff,
serializeImagePulls)
serializeImagePulls,
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podGetter, kubeRuntimeManager)

View File

@ -211,6 +211,8 @@ func New(
os kubecontainer.OSInterface,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
imagePullQPS float32,
imagePullBurst int,
requestTimeout time.Duration,
) (*Runtime, error) {
// Create dbus connection.
@ -275,7 +277,7 @@ func New(
rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls)
rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls, imagePullQPS, imagePullBurst)
if err := rkt.getVersions(); err != nil {
return nil, fmt.Errorf("rkt: error getting version info: %v", err)