Adding a kubelet flag to optionally enable parallel image pulls.

This commit is contained in:
Vishnu kannan 2015-10-20 14:49:44 -07:00
parent 94b45830c3
commit 0df4b46d4c
10 changed files with 317 additions and 58 deletions

View File

@ -146,6 +146,9 @@ type KubeletServer struct {
KubeApiQps float32
KubeApiBurst int
// Pull images one at a time.
SerializeImagePulls bool
}
// bootstrapping interface for kubelet, targets the initialization protocol
@ -185,6 +188,8 @@ func NewKubeletServer() *KubeletServer {
HTTPCheckFrequency: 20 * time.Second,
ImageGCHighThresholdPercent: 90,
ImageGCLowThresholdPercent: 80,
KubeApiQps: 5.0,
KubeApiBurst: 10,
KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),
LowDiskSpaceThresholdMB: 256,
MasterServiceNamespace: api.NamespaceDefault,
@ -199,6 +204,7 @@ func NewKubeletServer() *KubeletServer {
PodInfraContainerImage: dockertools.PodInfraContainerImage,
Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort,
ReconcileCIDR: true,
RegisterNode: true, // will be ignored if no apiserver is configured
RegisterSchedulable: true,
RegistryBurst: 10,
@ -208,9 +214,6 @@ func NewKubeletServer() *KubeletServer {
RootDirectory: defaultRootDir,
SyncFrequency: 10 * time.Second,
SystemContainer: "",
ReconcileCIDR: true,
KubeApiQps: 5.0,
KubeApiBurst: 10,
}
}
@ -292,6 +295,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]")
fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]")
}
// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup
@ -413,6 +417,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
RktStage1Image: s.RktStage1Image,
RootDirectory: s.RootDirectory,
Runonce: s.RunOnce,
SerializeImagePulls: s.SerializeImagePulls,
StandaloneMode: (len(s.APIServerList) == 0),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
SyncFrequency: s.SyncFrequency,
@ -672,6 +677,7 @@ func SimpleKubelet(client *client.Client,
ResolverConfig: kubelet.ResolvConfDefault,
ResourceContainer: "/kubelet",
RootDirectory: rootDir,
SerializeImagePulls: true,
SyncFrequency: syncFrequency,
SystemContainer: "",
TLSOptions: tlsOptions,
@ -866,6 +872,7 @@ type KubeletConfig struct {
RktStage1Image string
RootDirectory string
Runonce bool
SerializeImagePulls bool
StandaloneMode bool
StreamingConnectionIdleTimeout time.Duration
SyncFrequency time.Duration
@ -948,7 +955,9 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.ResolverConfig,
kc.CPUCFSQuota,
daemonEndpoints,
kc.OOMAdjuster)
kc.OOMAdjuster,
kc.SerializeImagePulls,
)
if err != nil {
return nil, nil, err

View File

@ -257,6 +257,7 @@ runtime-config
scheduler-config
schema-cache-dir
secure-port
serialize-image-pulls
service-account-key-file
service-account-lookup
service-account-private-key-file

View File

@ -18,7 +18,6 @@ package container
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
@ -26,36 +25,26 @@ import (
"k8s.io/kubernetes/pkg/util"
)
type imagePullRequest struct {
spec ImageSpec
container *api.Container
pullSecrets []api.Secret
logPrefix string
ref *api.ObjectReference
returnChan chan<- error
}
// imagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
pullRequests chan *imagePullRequest
type imagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
}
// enforce compatibility.
var _ ImagePuller = &imagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
pullRequests: make(chan *imagePullRequest, 10),
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
return &imagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
}
go util.Until(imagePuller.pullImages, time.Second, util.NeverStop)
return imagePuller
}
// shouldPullImage returns whether we should pull an image according to
@ -74,7 +63,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Eventf(ref, event, msg)
} else {
@ -83,7 +72,7 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, pref
}
// PullImage pulls the image for the specified pod and container.
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
if err != nil {
@ -116,18 +105,8 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
}
// enqueue image pull request and wait for response.
returnChan := make(chan error)
puller.pullRequests <- &imagePullRequest{
spec: spec,
container: container,
pullSecrets: pullSecrets,
logPrefix: logPrefix,
ref: ref,
returnChan: returnChan,
}
if err = <-returnChan; err != nil {
puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err := puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
@ -141,10 +120,3 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
puller.backOff.GC()
return nil, ""
}
func (puller *serializedImagePuller) pullImages() {
for pullRequest := range puller.pullRequests {
puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info)
pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
}
}

View File

@ -103,7 +103,7 @@ func TestPuller(t *testing.T) {
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr
@ -115,5 +115,6 @@ func TestPuller(t *testing.T) {
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}
}
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
type imagePullRequest struct {
spec ImageSpec
container *api.Container
pullSecrets []api.Secret
logPrefix string
ref *api.ObjectReference
returnChan chan<- error
}
// serializedImagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
pullRequests chan *imagePullRequest
}
// enforce compatibility.
var _ ImagePuller = &serializedImagePuller{}
// NewSerializedImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
pullRequests: make(chan *imagePullRequest, 10),
}
go util.Until(imagePuller.pullImages, time.Second, util.NeverStop)
return imagePuller
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Eventf(ref, event, msg)
} else {
logFn(fmt.Sprint(prefix, " ", msg))
}
}
// PullImage pulls the image for the specified pod and container.
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning)
return ErrImageInspect, msg
}
if !shouldPullImage(container, present) {
if present {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info)
return nil, ""
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
}
}
backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image)
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
}
// enqueue image pull request and wait for response.
returnChan := make(chan error)
puller.pullRequests <- &imagePullRequest{
spec: spec,
container: container,
pullSecrets: pullSecrets,
logPrefix: logPrefix,
ref: ref,
returnChan: returnChan,
}
if err = <-returnChan; err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is temporarily unavailable.", container.Image)
return err, msg
} else {
return ErrImagePull, err.Error()
}
}
puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
puller.backOff.GC()
return nil, ""
}
func (puller *serializedImagePuller) pullImages() {
for pullRequest := range puller.pullRequests {
puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info)
pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
}
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
func TestSerializedPuller(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test_pod",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
SelfLink: "/api/v1/pods/foo",
}}
cases := []struct {
containerImage string
policy api.PullPolicy
calledFunctions []string
inspectErr error
pullerErr error
expectedErr []error
}{
{ // pull missing image
containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil}},
{ // image present, dont pull
containerImage: "present_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// image present, pull it
{containerImage: "present_image",
policy: api.PullAlways,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// missing image, error PullNever
{containerImage: "missing_image",
policy: api.PullNever,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
// missing image, unable to inspect
{containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: errors.New("unknown inspectError"),
pullerErr: nil,
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
// missing image, unable to fetch
{containerImage: "typo_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: errors.New("404"),
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
}
for i, c := range cases {
container := &api.Container{
Name: "container_name",
Image: c.containerImage,
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}
}
}

View File

@ -47,7 +47,7 @@ func NewFakeDockerManager(
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff)
fakeOOMAdjuster, fakeProcFs, false, imageBackOff, true)
dm.dockerPuller = &FakeDockerPuller{}
return dm
}

View File

@ -161,7 +161,8 @@ func NewDockerManager(
oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFsInterface,
cpuCFSQuota bool,
imageBackOff *util.Backoff) *DockerManager {
imageBackOff *util.Backoff,
serializeImagePulls bool) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
@ -215,7 +216,11 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff)
if serializeImagePulls {
dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff)
} else {
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
}
dm.containerGC = NewContainerGC(client, containerLogsDir)
return dm

View File

@ -184,7 +184,9 @@ func NewMainKubelet(
resolverConfig string,
cpuCFSQuota bool,
daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster) (*Kubelet, error) {
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -335,7 +337,9 @@ func NewMainKubelet(
oomAdjuster,
procFs,
klet.cpuCFSQuota,
imageBackOff)
imageBackOff,
serializeImagePulls,
)
case "rkt":
conf := &rkt.Config{
@ -350,7 +354,9 @@ func NewMainKubelet(
containerRefManager,
klet.livenessManager,
klet.volumeManager,
imageBackOff)
imageBackOff,
serializeImagePulls,
)
if err != nil {
return nil, err
}

View File

@ -109,8 +109,9 @@ func New(config *Config,
containerRefManager *kubecontainer.RefManager,
livenessManager proberesults.Manager,
volumeGetter volumeGetter,
imageBackOff *util.Backoff) (*Runtime, error) {
imageBackOff *util.Backoff,
serializeImagePulls bool,
) (*Runtime, error) {
systemdVersion, err := getSystemdVersion()
if err != nil {
return nil, err
@ -149,7 +150,11 @@ func New(config *Config,
livenessManager: livenessManager,
volumeGetter: volumeGetter,
}
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
if serializeImagePulls {
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
} else {
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
}
// Test the rkt version.
version, err := rkt.Version()