diff --git a/pkg/kubelet/images/parallel_image_puller.go b/pkg/kubelet/images/image_manager.go similarity index 56% rename from pkg/kubelet/images/parallel_image_puller.go rename to pkg/kubelet/images/image_manager.go index 09fd47a04f8..5e17cb2c924 100644 --- a/pkg/kubelet/images/parallel_image_puller.go +++ b/pkg/kubelet/images/image_manager.go @@ -27,25 +27,29 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" ) -// imagePuller pulls the image using Runtime.PullImage(). -// It will check the presence of the image, and report the 'image pulling', -// 'image pulled' events correspondingly. -type parallelImagePuller struct { +// imageManager provides the functionalities for image pulling. +type imageManager struct { recorder record.EventRecorder runtime kubecontainer.Runtime backOff *flowcontrol.Backoff + // It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly. + puller imagePuller } -// enforce compatibility. -var _ imagePuller = ¶llelImagePuller{} +var _ ImageManager = &imageManager{} -// NewImagePuller takes an event recorder and container runtime to create a -// image puller that wraps the container runtime's PullImage interface. -func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { - return ¶llelImagePuller{ +func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { + var puller imagePuller + if serialized { + puller = newSerialImagePuller(runtime) + } else { + puller = newParallelImagePuller(runtime) + } + return &imageManager{ recorder: recorder, runtime: runtime, backOff: imageBackOff, + puller: puller, } } @@ -65,16 +69,16 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool { } // records an event using ref, event msg. log to glog using prefix, msg, logFn -func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { +func (m *imageManager) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { if ref != nil { - puller.recorder.Event(ref, eventtype, event, msg) + m.recorder.Event(ref, eventtype, event, msg) } else { logFn(fmt.Sprint(prefix, " ", msg)) } } -// PullImage pulls the image for the specified pod and container. -func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +// EnsureImageExists pulls the image for the specified pod and container. +func (m *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { @@ -82,35 +86,37 @@ func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Contai } spec := kubecontainer.ImageSpec{Image: container.Image} - present, err := puller.runtime.IsImagePresent(spec) + present, err := m.runtime.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) - puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) + m.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) return ErrImageInspect, msg } if !shouldPullImage(container, present) { if present { msg := fmt.Sprintf("Container image %q already present on machine", container.Image) - puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info) + m.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info) return nil, "" } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) - puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) + m.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) return ErrImageNeverPull, msg } } backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image) - if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { + if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) - puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) + m.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) return ErrImagePullBackOff, msg } - puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) - if err := puller.runtime.PullImage(spec, pullSecrets); err != nil { - puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) - puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) + m.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) + errChan := make(chan error) + m.puller.pullImage(spec, pullSecrets, errChan) + if err := <-errChan; err != nil { + m.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) + m.backOff.Next(backOffKey, m.backOff.Clock.Now()) if err == RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) return err, msg @@ -118,7 +124,7 @@ func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Contai return ErrImagePull, err.Error() } } - puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) - puller.backOff.GC() + m.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) + m.backOff.GC() return nil, "" } diff --git a/pkg/kubelet/images/serialized_image_puller_test.go b/pkg/kubelet/images/image_manager_test.go similarity index 54% rename from pkg/kubelet/images/serialized_image_puller_test.go rename to pkg/kubelet/images/image_manager_test.go index ef6eb1264b0..84309edc492 100644 --- a/pkg/kubelet/images/serialized_image_puller_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -30,6 +30,97 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" ) +func TestParallelPuller(t *testing.T) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "test_pod", + Namespace: "test-ns", + UID: "bar", + ResourceVersion: "42", + SelfLink: "/api/v1/pods/foo", + }} + + cases := []struct { + containerImage string + policy api.PullPolicy + calledFunctions []string + inspectErr error + pullerErr error + expectedErr []error + }{ + { // pull missing image + containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil}}, + + { // image present, dont pull + containerImage: "present_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // image present, pull it + {containerImage: "present_image", + policy: api.PullAlways, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{nil, nil, nil}}, + // missing image, error PullNever + {containerImage: "missing_image", + policy: api.PullNever, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: nil, + pullerErr: nil, + expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}}, + // missing image, unable to inspect + {containerImage: "missing_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent"}, + inspectErr: errors.New("unknown inspectError"), + pullerErr: nil, + expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}}, + // missing image, unable to fetch + {containerImage: "typo_image", + policy: api.PullIfNotPresent, + calledFunctions: []string{"IsImagePresent", "PullImage"}, + inspectErr: nil, + pullerErr: errors.New("404"), + expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}}, + } + + for i, c := range cases { + container := &api.Container{ + Name: "container_name", + Image: c.containerImage, + ImagePullPolicy: c.policy, + } + + backOff := flowcontrol.NewBackOff(time.Second, time.Minute) + fakeClock := clock.NewFakeClock(time.Now()) + backOff.Clock = fakeClock + + fakeRuntime := &ctest.FakeRuntime{} + fakeRecorder := &record.FakeRecorder{} + puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false) + + fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}} + fakeRuntime.Err = c.pullerErr + fakeRuntime.InspectErr = c.inspectErr + + for tick, expected := range c.expectedErr { + fakeClock.Step(time.Second) + err, _ := puller.EnsureImageExists(pod, container, nil) + fakeRuntime.AssertCalls(c.calledFunctions) + assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) + } + } +} + func TestSerializedPuller(t *testing.T) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -106,7 +197,7 @@ func TestSerializedPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true) fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}} fakeRuntime.Err = c.pullerErr @@ -114,10 +205,9 @@ func TestSerializedPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.pullImage(pod, container, nil) + err, _ := puller.EnsureImageExists(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } - } } diff --git a/pkg/kubelet/images/impl.go b/pkg/kubelet/images/impl.go deleted file mode 100644 index 4a00a2e36f8..00000000000 --- a/pkg/kubelet/images/impl.go +++ /dev/null @@ -1,52 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package images - -import ( - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/record" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/util/flowcontrol" -) - -type imageManager struct { - recorder record.EventRecorder - runtime kubecontainer.Runtime - backOff *flowcontrol.Backoff - imagePuller imagePuller -} - -var _ ImageManager = &imageManager{} - -func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { - var imagePuller imagePuller - if serialized { - imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff) - } else { - imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff) - } - return &imageManager{ - recorder: recorder, - runtime: runtime, - backOff: imageBackOff, - imagePuller: imagePuller, - } -} - -func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { - return im.imagePuller.pullImage(pod, container, pullSecrets) -} diff --git a/pkg/kubelet/images/parallel_image_puller_test.go b/pkg/kubelet/images/parallel_image_puller_test.go deleted file mode 100644 index 24414f64e66..00000000000 --- a/pkg/kubelet/images/parallel_image_puller_test.go +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package images - -import ( - "errors" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/record" - . "k8s.io/kubernetes/pkg/kubelet/container" - ctest "k8s.io/kubernetes/pkg/kubelet/container/testing" - "k8s.io/kubernetes/pkg/util/clock" - "k8s.io/kubernetes/pkg/util/flowcontrol" -) - -func TestPuller(t *testing.T) { - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "test_pod", - Namespace: "test-ns", - UID: "bar", - ResourceVersion: "42", - SelfLink: "/api/v1/pods/foo", - }} - - cases := []struct { - containerImage string - policy api.PullPolicy - calledFunctions []string - inspectErr error - pullerErr error - expectedErr []error - }{ - { // pull missing image - containerImage: "missing_image", - policy: api.PullIfNotPresent, - calledFunctions: []string{"IsImagePresent", "PullImage"}, - inspectErr: nil, - pullerErr: nil, - expectedErr: []error{nil}}, - - { // image present, don't pull - containerImage: "present_image", - policy: api.PullIfNotPresent, - calledFunctions: []string{"IsImagePresent"}, - inspectErr: nil, - pullerErr: nil, - expectedErr: []error{nil, nil, nil}}, - // image present, pull it - {containerImage: "present_image", - policy: api.PullAlways, - calledFunctions: []string{"IsImagePresent", "PullImage"}, - inspectErr: nil, - pullerErr: nil, - expectedErr: []error{nil, nil, nil}}, - // missing image, error PullNever - {containerImage: "missing_image", - policy: api.PullNever, - calledFunctions: []string{"IsImagePresent"}, - inspectErr: nil, - pullerErr: nil, - expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}}, - // missing image, unable to inspect - {containerImage: "missing_image", - policy: api.PullIfNotPresent, - calledFunctions: []string{"IsImagePresent"}, - inspectErr: errors.New("unknown inspectError"), - pullerErr: nil, - expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}}, - // missing image, unable to fetch - {containerImage: "typo_image", - policy: api.PullIfNotPresent, - calledFunctions: []string{"IsImagePresent", "PullImage"}, - inspectErr: nil, - pullerErr: errors.New("404"), - expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}}, - } - - for i, c := range cases { - container := &api.Container{ - Name: "container_name", - Image: c.containerImage, - ImagePullPolicy: c.policy, - } - - backOff := flowcontrol.NewBackOff(time.Second, time.Minute) - fakeClock := clock.NewFakeClock(time.Now()) - backOff.Clock = fakeClock - - fakeRuntime := &ctest.FakeRuntime{} - fakeRecorder := &record.FakeRecorder{} - puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff) - - fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}} - fakeRuntime.Err = c.pullerErr - fakeRuntime.InspectErr = c.inspectErr - - for tick, expected := range c.expectedErr { - fakeClock.Step(time.Second) - err, _ := puller.pullImage(pod, container, nil) - fakeRuntime.AssertCalls(c.calledFunctions) - assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) - } - - } -} diff --git a/pkg/kubelet/images/puller.go b/pkg/kubelet/images/puller.go new file mode 100644 index 00000000000..68a70ed0e83 --- /dev/null +++ b/pkg/kubelet/images/puller.go @@ -0,0 +1,79 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package images + +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/wait" +) + +type imagePuller interface { + pullImage(kubecontainer.ImageSpec, []api.Secret, chan<- error) +} + +var _, _ imagePuller = ¶llelImagePuller{}, &serialImagePuller{} + +type parallelImagePuller struct { + runtime kubecontainer.Runtime +} + +func newParallelImagePuller(runtime kubecontainer.Runtime) imagePuller { + return ¶llelImagePuller{runtime} +} + +func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) { + go func() { + errChan <- pip.runtime.PullImage(spec, pullSecrets) + }() +} + +// Maximum number of image pull requests than can be queued. +const maxImagePullRequests = 10 + +type serialImagePuller struct { + runtime kubecontainer.Runtime + pullRequests chan *imagePullRequest +} + +func newSerialImagePuller(runtime kubecontainer.Runtime) imagePuller { + imagePuller := &serialImagePuller{runtime, make(chan *imagePullRequest, maxImagePullRequests)} + go wait.Until(imagePuller.processImagePullRequests, time.Second, wait.NeverStop) + return imagePuller +} + +type imagePullRequest struct { + spec kubecontainer.ImageSpec + pullSecrets []api.Secret + errChan chan<- error +} + +func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) { + sip.pullRequests <- &imagePullRequest{ + spec: spec, + pullSecrets: pullSecrets, + errChan: errChan, + } +} + +func (sip *serialImagePuller) processImagePullRequests() { + for pullRequest := range sip.pullRequests { + pullRequest.errChan <- sip.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) + } +} diff --git a/pkg/kubelet/images/serialized_image_puller.go b/pkg/kubelet/images/serialized_image_puller.go deleted file mode 100644 index b08c43fefb5..00000000000 --- a/pkg/kubelet/images/serialized_image_puller.go +++ /dev/null @@ -1,143 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package images - -import ( - "fmt" - "time" - - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/record" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/events" - "k8s.io/kubernetes/pkg/util/flowcontrol" - "k8s.io/kubernetes/pkg/util/wait" -) - -type imagePullRequest struct { - spec kubecontainer.ImageSpec - container *api.Container - pullSecrets []api.Secret - logPrefix string - ref *api.ObjectReference - returnChan chan<- error -} - -// serializedImagePuller pulls the image using Runtime.PullImage(). -// It will check the presence of the image, and report the 'image pulling', -// 'image pulled' events correspondingly. -type serializedImagePuller struct { - recorder record.EventRecorder - runtime kubecontainer.Runtime - backOff *flowcontrol.Backoff - pullRequests chan *imagePullRequest -} - -// enforce compatibility. -var _ imagePuller = &serializedImagePuller{} - -// NewSerializedImagePuller takes an event recorder and container runtime to create a -// image puller that wraps the container runtime's PullImage interface. -// Pulls one image at a time. -// Issue #10959 has the rationale behind serializing image pulls. -func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { - imagePuller := &serializedImagePuller{ - recorder: recorder, - runtime: runtime, - backOff: imageBackOff, - pullRequests: make(chan *imagePullRequest, 10), - } - go wait.Until(imagePuller.pullImages, time.Second, wait.NeverStop) - return imagePuller -} - -// records an event using ref, event msg. log to glog using prefix, msg, logFn -func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { - if ref != nil { - puller.recorder.Event(ref, eventtype, event, msg) - } else { - logFn(fmt.Sprint(prefix, " ", msg)) - } -} - -// PullImage pulls the image for the specified pod and container. -func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { - logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) - } - - spec := kubecontainer.ImageSpec{Image: container.Image} - present, err := puller.runtime.IsImagePresent(spec) - if err != nil { - msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) - puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg - } - - if !shouldPullImage(container, present) { - if present { - msg := fmt.Sprintf("Container image %q already present on machine", container.Image) - puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info) - return nil, "" - } else { - msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) - puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg - } - } - - backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image) - if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { - msg := fmt.Sprintf("Back-off pulling image %q", container.Image) - puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg - } - - // enqueue image pull request and wait for response. - returnChan := make(chan error) - puller.pullRequests <- &imagePullRequest{ - spec: spec, - container: container, - pullSecrets: pullSecrets, - logPrefix: logPrefix, - ref: ref, - returnChan: returnChan, - } - if err = <-returnChan; err != nil { - puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) - puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) - if err == RegistryUnavailable { - msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) - return err, msg - } else { - return ErrImagePull, err.Error() - } - } - puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) - puller.backOff.GC() - return nil, "" -} - -func (puller *serializedImagePuller) pullImages() { - for pullRequest := range puller.pullRequests { - puller.logIt(pullRequest.ref, api.EventTypeNormal, events.PullingImage, pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info) - pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets) - } -} diff --git a/pkg/kubelet/images/types.go b/pkg/kubelet/images/types.go index 4c79110fb65..ab16bc69767 100644 --- a/pkg/kubelet/images/types.go +++ b/pkg/kubelet/images/types.go @@ -50,10 +50,3 @@ type ImageManager interface { // TODO(ronl): consolidating image managing and deleting operation in this interface } - -// ImagePuller wraps Runtime.PullImage() to pull a container image. -// It will check the presence of the image, and report the 'image pulling', -// 'image pulled' events correspondingly. -type imagePuller interface { - pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) -}