diff --git a/pkg/kubelet/api/services.go b/pkg/kubelet/api/services.go index a1672cccc4f..f3febd1ebac 100644 --- a/pkg/kubelet/api/services.go +++ b/pkg/kubelet/api/services.go @@ -94,7 +94,7 @@ type ImageManagerService interface { // ImageStatus returns the status of the image. ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) // PullImage pulls an image with the authentication config. - PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) error + PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) // RemoveImage removes the image. RemoveImage(image *runtimeapi.ImageSpec) error } diff --git a/pkg/kubelet/api/testing/fake_image_service.go b/pkg/kubelet/api/testing/fake_image_service.go index 22eb9e4fa2f..6bd243afb46 100644 --- a/pkg/kubelet/api/testing/fake_image_service.go +++ b/pkg/kubelet/api/testing/fake_image_service.go @@ -91,7 +91,7 @@ func (r *FakeImageService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi return r.Images[image.GetImage()], nil } -func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) error { +func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) { r.Lock() defer r.Unlock() @@ -104,7 +104,7 @@ func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimea r.Images[imageID] = r.makeFakeImage(image.GetImage()) } - return nil + return imageID, nil } func (r *FakeImageService) RemoveImage(image *runtimeapi.ImageSpec) error { diff --git a/pkg/kubelet/images/helpers.go b/pkg/kubelet/images/helpers.go index 67a8da94ff3..f7329fba15e 100644 --- a/pkg/kubelet/images/helpers.go +++ b/pkg/kubelet/images/helpers.go @@ -42,9 +42,9 @@ type throttledImageService struct { limiter flowcontrol.RateLimiter } -func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []v1.Secret) error { +func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []v1.Secret) (string, error) { if ts.limiter.TryAccept() { return ts.ImageService.PullImage(image, secrets) } - return fmt.Errorf("pull QPS exceeded.") + return "", fmt.Errorf("pull QPS exceeded.") } diff --git a/pkg/kubelet/images/image_manager.go b/pkg/kubelet/images/image_manager.go index b2809f1353a..e4ba9c4c02e 100644 --- a/pkg/kubelet/images/image_manager.go +++ b/pkg/kubelet/images/image_manager.go @@ -81,8 +81,9 @@ func (m *imageManager) logIt(ref *v1.ObjectReference, eventtype, event, prefix, } } -// EnsureImageExists pulls the image for the specified pod and container. -func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (error, string) { +// EnsureImageExists pulls the image for the specified pod and container, and returnsIt returns +// (imageRef, error message, error). +func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (string, string, error) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { @@ -94,26 +95,26 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p if err != nil { msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err) m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrInvalidImageName, msg + return "", msg, ErrInvalidImageName } spec := kubecontainer.ImageSpec{Image: image} - present, err := m.imageService.IsImagePresent(spec) + imageRef, err := m.imageService.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg + return "", msg, ErrImageInspect } - if !shouldPullImage(container, present) { - if present { + if !shouldPullImage(container, imageRef != "") { + if imageRef != "" { msg := fmt.Sprintf("Container image %q already present on machine", container.Image) m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info) - return nil, "" + return imageRef, "", nil } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg + return "", msg, ErrImageNeverPull } } @@ -121,24 +122,25 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg + return "", msg, ErrImagePullBackOff } m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) - errChan := make(chan error) - m.puller.pullImage(spec, pullSecrets, errChan) - if err := <-errChan; err != nil { - m.logIt(ref, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) + pullChan := make(chan imageRefWithError) + m.puller.pullImage(spec, pullSecrets, pullChan) + imageRefWithErr := <-pullChan + if imageRefWithErr.err != nil { + m.logIt(ref, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, imageRefWithErr.err), glog.Warning) m.backOff.Next(backOffKey, m.backOff.Clock.Now()) - if err == RegistryUnavailable { + if imageRefWithErr.err == RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) - return err, msg - } else { - return ErrImagePull, err.Error() + return "", msg, imageRefWithErr.err } + + return "", imageRefWithErr.err.Error(), ErrImagePull } m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) m.backOff.GC() - return nil, "" + return imageRefWithErr.imageRef, "", nil } // applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest, diff --git a/pkg/kubelet/images/image_manager_test.go b/pkg/kubelet/images/image_manager_test.go index e339a7dec59..356be242b25 100644 --- a/pkg/kubelet/images/image_manager_test.go +++ b/pkg/kubelet/images/image_manager_test.go @@ -126,7 +126,7 @@ func TestParallelPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.EnsureImageExists(pod, container, nil) + _, _, err := puller.EnsureImageExists(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } @@ -150,7 +150,7 @@ func TestSerializedPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.EnsureImageExists(pod, container, nil) + _, _, err := puller.EnsureImageExists(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } diff --git a/pkg/kubelet/images/puller.go b/pkg/kubelet/images/puller.go index 5698e4c2664..6ff1d7cb239 100644 --- a/pkg/kubelet/images/puller.go +++ b/pkg/kubelet/images/puller.go @@ -24,8 +24,13 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) +type imageRefWithError struct { + imageRef string + err error +} + type imagePuller interface { - pullImage(kubecontainer.ImageSpec, []v1.Secret, chan<- error) + pullImage(kubecontainer.ImageSpec, []v1.Secret, chan<- imageRefWithError) } var _, _ imagePuller = ¶llelImagePuller{}, &serialImagePuller{} @@ -38,9 +43,13 @@ func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller return ¶llelImagePuller{imageService} } -func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, errChan chan<- error) { +func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- imageRefWithError) { go func() { - errChan <- pip.imageService.PullImage(spec, pullSecrets) + imageRef, err := pip.imageService.PullImage(spec, pullSecrets) + pullChan <- imageRefWithError{ + imageRef: imageRef, + err: err, + } }() } @@ -61,19 +70,23 @@ func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller { type imagePullRequest struct { spec kubecontainer.ImageSpec pullSecrets []v1.Secret - errChan chan<- error + pullChan chan<- imageRefWithError } -func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, errChan chan<- error) { +func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- imageRefWithError) { sip.pullRequests <- &imagePullRequest{ spec: spec, pullSecrets: pullSecrets, - errChan: errChan, + pullChan: pullChan, } } func (sip *serialImagePuller) processImagePullRequests() { for pullRequest := range sip.pullRequests { - pullRequest.errChan <- sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets) + imageRef, err := sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets) + pullRequest.pullChan <- imageRefWithError{ + imageRef: imageRef, + err: err, + } } } diff --git a/pkg/kubelet/images/types.go b/pkg/kubelet/images/types.go index 9870088f948..8eaef304afd 100644 --- a/pkg/kubelet/images/types.go +++ b/pkg/kubelet/images/types.go @@ -49,7 +49,7 @@ var ( // Implementations are expected to be thread safe. type ImageManager interface { // EnsureImageExists ensures that image specified in `container` exists. - EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (error, string) + EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (string, string, error) // TODO(ronl): consolidating image managing and deleting operation in this interface }