From 0df4b46d4ca8e6244455c8199a06ee0fe960ced6 Mon Sep 17 00:00:00 2001 From: Vishnu kannan Date: Tue, 20 Oct 2015 14:49:44 -0700 Subject: [PATCH] Adding a kubelet flag to optionally enable parallel image pulls. --- cmd/kubelet/app/server.go | 17 ++- hack/verify-flags/known-flags.txt | 1 + pkg/kubelet/container/image_puller.go | 60 ++------ pkg/kubelet/container/image_puller_test.go | 3 +- .../container/serialized_image_puller.go | 140 ++++++++++++++++++ .../container/serialized_image_puller_test.go | 120 +++++++++++++++ pkg/kubelet/dockertools/fake_manager.go | 2 +- pkg/kubelet/dockertools/manager.go | 9 +- pkg/kubelet/kubelet.go | 12 +- pkg/kubelet/rkt/rkt.go | 11 +- 10 files changed, 317 insertions(+), 58 deletions(-) create mode 100644 pkg/kubelet/container/serialized_image_puller.go create mode 100644 pkg/kubelet/container/serialized_image_puller_test.go diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 405ce99ecda..cc1ffe58dc4 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -146,6 +146,9 @@ type KubeletServer struct { KubeApiQps float32 KubeApiBurst int + + // Pull images one at a time. + SerializeImagePulls bool } // bootstrapping interface for kubelet, targets the initialization protocol @@ -185,6 +188,8 @@ func NewKubeletServer() *KubeletServer { HTTPCheckFrequency: 20 * time.Second, ImageGCHighThresholdPercent: 90, ImageGCLowThresholdPercent: 80, + KubeApiQps: 5.0, + KubeApiBurst: 10, KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"), LowDiskSpaceThresholdMB: 256, MasterServiceNamespace: api.NamespaceDefault, @@ -199,6 +204,7 @@ func NewKubeletServer() *KubeletServer { PodInfraContainerImage: dockertools.PodInfraContainerImage, Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, + ReconcileCIDR: true, RegisterNode: true, // will be ignored if no apiserver is configured RegisterSchedulable: true, RegistryBurst: 10, @@ -208,9 +214,6 @@ func NewKubeletServer() *KubeletServer { RootDirectory: defaultRootDir, SyncFrequency: 10 * time.Second, SystemContainer: "", - ReconcileCIDR: true, - KubeApiQps: 5.0, - KubeApiBurst: 10, } } @@ -292,6 +295,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]") fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver") + fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]") } // UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup @@ -413,6 +417,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) { RktStage1Image: s.RktStage1Image, RootDirectory: s.RootDirectory, Runonce: s.RunOnce, + SerializeImagePulls: s.SerializeImagePulls, StandaloneMode: (len(s.APIServerList) == 0), StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, SyncFrequency: s.SyncFrequency, @@ -672,6 +677,7 @@ func SimpleKubelet(client *client.Client, ResolverConfig: kubelet.ResolvConfDefault, ResourceContainer: "/kubelet", RootDirectory: rootDir, + SerializeImagePulls: true, SyncFrequency: syncFrequency, SystemContainer: "", TLSOptions: tlsOptions, @@ -866,6 +872,7 @@ type KubeletConfig struct { RktStage1Image string RootDirectory string Runonce bool + SerializeImagePulls bool StandaloneMode bool StreamingConnectionIdleTimeout time.Duration SyncFrequency time.Duration @@ -948,7 +955,9 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.ResolverConfig, kc.CPUCFSQuota, daemonEndpoints, - kc.OOMAdjuster) + kc.OOMAdjuster, + kc.SerializeImagePulls, + ) if err != nil { return nil, nil, err diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index b44930ec655..d93dfec29ee 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -257,6 +257,7 @@ runtime-config scheduler-config schema-cache-dir secure-port +serialize-image-pulls service-account-key-file service-account-lookup service-account-private-key-file diff --git a/pkg/kubelet/container/image_puller.go b/pkg/kubelet/container/image_puller.go index 76b0fb812f0..7b417fad6da 100644 --- a/pkg/kubelet/container/image_puller.go +++ b/pkg/kubelet/container/image_puller.go @@ -18,7 +18,6 @@ package container import ( "fmt" - "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" @@ -26,36 +25,26 @@ import ( "k8s.io/kubernetes/pkg/util" ) -type imagePullRequest struct { - spec ImageSpec - container *api.Container - pullSecrets []api.Secret - logPrefix string - ref *api.ObjectReference - returnChan chan<- error -} - // imagePuller pulls the image using Runtime.PullImage(). // It will check the presence of the image, and report the 'image pulling', // 'image pulled' events correspondingly. -type serializedImagePuller struct { - recorder record.EventRecorder - runtime Runtime - backOff *util.Backoff - pullRequests chan *imagePullRequest +type imagePuller struct { + recorder record.EventRecorder + runtime Runtime + backOff *util.Backoff } +// enforce compatibility. +var _ ImagePuller = &imagePuller{} + // NewImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. -func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { - imagePuller := &serializedImagePuller{ - recorder: recorder, - runtime: runtime, - backOff: imageBackOff, - pullRequests: make(chan *imagePullRequest, 10), +func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { + return &imagePuller{ + recorder: recorder, + runtime: runtime, + backOff: imageBackOff, } - go util.Until(imagePuller.pullImages, time.Second, util.NeverStop) - return imagePuller } // shouldPullImage returns whether we should pull an image according to @@ -74,7 +63,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool { } // records an event using ref, event msg. log to glog using prefix, msg, logFn -func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { +func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { if ref != nil { puller.recorder.Eventf(ref, event, msg) } else { @@ -83,7 +72,7 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, pref } // PullImage pulls the image for the specified pod and container. -func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) ref, err := GenerateContainerRef(pod, container) if err != nil { @@ -116,18 +105,8 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info) return ErrImagePullBackOff, msg } - - // enqueue image pull request and wait for response. - returnChan := make(chan error) - puller.pullRequests <- &imagePullRequest{ - spec: spec, - container: container, - pullSecrets: pullSecrets, - logPrefix: logPrefix, - ref: ref, - returnChan: returnChan, - } - if err = <-returnChan; err != nil { + puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) + if err := puller.runtime.PullImage(spec, pullSecrets); err != nil { puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) if err == RegistryUnavailable { @@ -141,10 +120,3 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont puller.backOff.GC() return nil, "" } - -func (puller *serializedImagePuller) pullImages() { - for pullRequest := range puller.pullRequests { - puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info) - pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) - } -} diff --git a/pkg/kubelet/container/image_puller_test.go b/pkg/kubelet/container/image_puller_test.go index 432ba1ef059..df48428fa56 100644 --- a/pkg/kubelet/container/image_puller_test.go +++ b/pkg/kubelet/container/image_puller_test.go @@ -103,7 +103,7 @@ func TestPuller(t *testing.T) { fakeRuntime := &FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, 0}} fakeRuntime.Err = c.pullerErr @@ -115,5 +115,6 @@ func TestPuller(t *testing.T) { fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } + } } diff --git a/pkg/kubelet/container/serialized_image_puller.go b/pkg/kubelet/container/serialized_image_puller.go new file mode 100644 index 00000000000..7802e165abc --- /dev/null +++ b/pkg/kubelet/container/serialized_image_puller.go @@ -0,0 +1,140 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util" +) + +type imagePullRequest struct { + spec ImageSpec + container *api.Container + pullSecrets []api.Secret + logPrefix string + ref *api.ObjectReference + returnChan chan<- error +} + +// serializedImagePuller pulls the image using Runtime.PullImage(). +// It will check the presence of the image, and report the 'image pulling', +// 'image pulled' events correspondingly. +type serializedImagePuller struct { + recorder record.EventRecorder + runtime Runtime + backOff *util.Backoff + pullRequests chan *imagePullRequest +} + +// enforce compatibility. +var _ ImagePuller = &serializedImagePuller{} + +// NewSerializedImagePuller takes an event recorder and container runtime to create a +// image puller that wraps the container runtime's PullImage interface. +// Pulls one image at a time. +// Issue #10959 has the rationale behind serializing image pulls. +func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { + imagePuller := &serializedImagePuller{ + recorder: recorder, + runtime: runtime, + backOff: imageBackOff, + pullRequests: make(chan *imagePullRequest, 10), + } + go util.Until(imagePuller.pullImages, time.Second, util.NeverStop) + return imagePuller +} + +// records an event using ref, event msg. log to glog using prefix, msg, logFn +func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) { + if ref != nil { + puller.recorder.Eventf(ref, event, msg) + } else { + logFn(fmt.Sprint(prefix, " ", msg)) + } +} + +// PullImage pulls the image for the specified pod and container. +func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { + logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) + ref, err := GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) + } + + spec := ImageSpec{container.Image} + present, err := puller.runtime.IsImagePresent(spec) + if err != nil { + msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) + puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning) + return ErrImageInspect, msg + } + + if !shouldPullImage(container, present) { + if present { + msg := fmt.Sprintf("Container image %q already present on machine", container.Image) + puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info) + return nil, "" + } else { + msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) + puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning) + return ErrImageNeverPull, msg + } + } + + backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image) + if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { + msg := fmt.Sprintf("Back-off pulling image %q", container.Image) + puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info) + return ErrImagePullBackOff, msg + } + + // enqueue image pull request and wait for response. + returnChan := make(chan error) + puller.pullRequests <- &imagePullRequest{ + spec: spec, + container: container, + pullSecrets: pullSecrets, + logPrefix: logPrefix, + ref: ref, + returnChan: returnChan, + } + if err = <-returnChan; err != nil { + puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) + puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) + if err == RegistryUnavailable { + msg := fmt.Sprintf("image pull failed for %s because the registry is temporarily unavailable.", container.Image) + return err, msg + } else { + return ErrImagePull, err.Error() + } + } + puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) + puller.backOff.GC() + return nil, "" +} + +func (puller *serializedImagePuller) pullImages() { + for pullRequest := range puller.pullRequests { + puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info) + pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) + } +} diff --git a/pkg/kubelet/container/serialized_image_puller_test.go b/pkg/kubelet/container/serialized_image_puller_test.go new file mode 100644 index 00000000000..9313dfc09e6 --- /dev/null +++ b/pkg/kubelet/container/serialized_image_puller_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util" +) + +func TestSerializedPuller(t *testing.T) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "test_pod", + Namespace: "test-ns", + UID: "bar", + ResourceVersion: "42", + SelfLink: "/api/v1/pods/foo", + }} + + cases := []struct { + containerImage string + policy api.PullPolicy + calledFunctions []string + inspectErr error + pullerErr error + expectedErr []error + }{ + { // pull missing image + containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil}}, + + { // image present, dont pull + containerImage: "present_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // image present, pull it + {containerImage: "present_image", + policy: api.PullAlways, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // missing image, error PullNever + {containerImage: "missing_image", + policy: api.PullNever, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}}, + // missing image, unable to inspect + {containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: errors.New("unknown inspectError"), + pullerErr: nil, + expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}}, + // missing image, unable to fetch + {containerImage: "typo_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: errors.New("404"), + expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}}, + } + + for i, c := range cases { + container := &api.Container{ + Name: "container_name", + Image: c.containerImage, + ImagePullPolicy: c.policy, + } + + backOff := util.NewBackOff(time.Second, time.Minute) + fakeClock := &util.FakeClock{Time: time.Now()} + backOff.Clock = fakeClock + + fakeRuntime := &FakeRuntime{} + fakeRecorder := &record.FakeRecorder{} + puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) + + fakeRuntime.ImageList = []Image{{"present_image", nil, 0}} + fakeRuntime.Err = c.pullerErr + fakeRuntime.InspectErr = c.inspectErr + + for tick, expected := range c.expectedErr { + fakeClock.Step(time.Second) + err, _ := puller.PullImage(pod, container, nil) + fakeRuntime.AssertCalls(c.calledFunctions) + assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) + } + + } +} diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 16a1d5fe73d..04e6a014ed2 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -47,7 +47,7 @@ func NewFakeDockerManager( fakeProcFs := procfs.NewFakeProcFs() dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, - fakeOOMAdjuster, fakeProcFs, false, imageBackOff) + fakeOOMAdjuster, fakeProcFs, false, imageBackOff, true) dm.dockerPuller = &FakeDockerPuller{} return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 78d8b7a4e27..1164efec7bf 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -161,7 +161,8 @@ func NewDockerManager( oomAdjuster *oom.OOMAdjuster, procFs procfs.ProcFsInterface, cpuCFSQuota bool, - imageBackOff *util.Backoff) *DockerManager { + imageBackOff *util.Backoff, + serializeImagePulls bool) *DockerManager { // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. @@ -215,7 +216,11 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff) + if serializeImagePulls { + dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff) + } else { + dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff) + } dm.containerGC = NewContainerGC(client, containerLogsDir) return dm diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3c2f05a8585..b5784461cee 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -184,7 +184,9 @@ func NewMainKubelet( resolverConfig string, cpuCFSQuota bool, daemonEndpoints *api.NodeDaemonEndpoints, - oomAdjuster *oom.OOMAdjuster) (*Kubelet, error) { + oomAdjuster *oom.OOMAdjuster, + serializeImagePulls bool, +) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -335,7 +337,9 @@ func NewMainKubelet( oomAdjuster, procFs, klet.cpuCFSQuota, - imageBackOff) + imageBackOff, + serializeImagePulls, + ) case "rkt": conf := &rkt.Config{ @@ -350,7 +354,9 @@ func NewMainKubelet( containerRefManager, klet.livenessManager, klet.volumeManager, - imageBackOff) + imageBackOff, + serializeImagePulls, + ) if err != nil { return nil, err } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index bcc0d9655a7..93cdbbcd818 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -109,8 +109,9 @@ func New(config *Config, containerRefManager *kubecontainer.RefManager, livenessManager proberesults.Manager, volumeGetter volumeGetter, - imageBackOff *util.Backoff) (*Runtime, error) { - + imageBackOff *util.Backoff, + serializeImagePulls bool, +) (*Runtime, error) { systemdVersion, err := getSystemdVersion() if err != nil { return nil, err @@ -149,7 +150,11 @@ func New(config *Config, livenessManager: livenessManager, volumeGetter: volumeGetter, } - rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff) + if serializeImagePulls { + rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff) + } else { + rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) + } // Test the rkt version. version, err := rkt.Version()