kubelet/images: return image ref in EnsureImageExists

This commit is contained in:
Pengfei Ni 2016-12-29 15:24:19 +08:00
parent ba5a684c4c
commit 37fc81be0e
7 changed files with 49 additions and 34 deletions

View File

@ -94,7 +94,7 @@ type ImageManagerService interface {
// ImageStatus returns the status of the image. // ImageStatus returns the status of the image.
ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
// PullImage pulls an image with the authentication config. // PullImage pulls an image with the authentication config.
PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) error PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error)
// RemoveImage removes the image. // RemoveImage removes the image.
RemoveImage(image *runtimeapi.ImageSpec) error RemoveImage(image *runtimeapi.ImageSpec) error
} }

View File

@ -91,7 +91,7 @@ func (r *FakeImageService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi
return r.Images[image.GetImage()], nil return r.Images[image.GetImage()], nil
} }
func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) error { func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -104,7 +104,7 @@ func (r *FakeImageService) PullImage(image *runtimeapi.ImageSpec, auth *runtimea
r.Images[imageID] = r.makeFakeImage(image.GetImage()) r.Images[imageID] = r.makeFakeImage(image.GetImage())
} }
return nil return imageID, nil
} }
func (r *FakeImageService) RemoveImage(image *runtimeapi.ImageSpec) error { func (r *FakeImageService) RemoveImage(image *runtimeapi.ImageSpec) error {

View File

@ -42,9 +42,9 @@ type throttledImageService struct {
limiter flowcontrol.RateLimiter limiter flowcontrol.RateLimiter
} }
func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []v1.Secret) error { func (ts throttledImageService) PullImage(image kubecontainer.ImageSpec, secrets []v1.Secret) (string, error) {
if ts.limiter.TryAccept() { if ts.limiter.TryAccept() {
return ts.ImageService.PullImage(image, secrets) return ts.ImageService.PullImage(image, secrets)
} }
return fmt.Errorf("pull QPS exceeded.") return "", fmt.Errorf("pull QPS exceeded.")
} }

View File

@ -81,8 +81,9 @@ func (m *imageManager) logIt(ref *v1.ObjectReference, eventtype, event, prefix,
} }
} }
// EnsureImageExists pulls the image for the specified pod and container. // EnsureImageExists pulls the image for the specified pod and container, and returnsIt returns
func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (error, string) { // (imageRef, error message, error).
func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (string, string, error) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := kubecontainer.GenerateContainerRef(pod, container) ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil { if err != nil {
@ -94,26 +95,26 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrInvalidImageName, msg return "", msg, ErrInvalidImageName
} }
spec := kubecontainer.ImageSpec{Image: image} spec := kubecontainer.ImageSpec{Image: image}
present, err := m.imageService.IsImagePresent(spec) imageRef, err := m.imageService.IsImagePresent(spec)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrImageInspect, msg return "", msg, ErrImageInspect
} }
if !shouldPullImage(container, present) { if !shouldPullImage(container, imageRef != "") {
if present { if imageRef != "" {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image) msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info) m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info)
return nil, "" return imageRef, "", nil
} else { } else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg return "", msg, ErrImageNeverPull
} }
} }
@ -121,24 +122,25 @@ func (m *imageManager) EnsureImageExists(pod *v1.Pod, container *v1.Container, p
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) { if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image) msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg return "", msg, ErrImagePullBackOff
} }
m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
errChan := make(chan error) pullChan := make(chan imageRefWithError)
m.puller.pullImage(spec, pullSecrets, errChan) m.puller.pullImage(spec, pullSecrets, pullChan)
if err := <-errChan; err != nil { imageRefWithErr := <-pullChan
m.logIt(ref, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) if imageRefWithErr.err != nil {
m.logIt(ref, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, imageRefWithErr.err), glog.Warning)
m.backOff.Next(backOffKey, m.backOff.Clock.Now()) m.backOff.Next(backOffKey, m.backOff.Clock.Now())
if err == RegistryUnavailable { if imageRefWithErr.err == RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
return err, msg return "", msg, imageRefWithErr.err
} else {
return ErrImagePull, err.Error()
} }
return "", imageRefWithErr.err.Error(), ErrImagePull
} }
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
m.backOff.GC() m.backOff.GC()
return nil, "" return imageRefWithErr.imageRef, "", nil
} }
// applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest, // applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest,

View File

@ -126,7 +126,7 @@ func TestParallelPuller(t *testing.T) {
for tick, expected := range c.expectedErr { for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
err, _ := puller.EnsureImageExists(pod, container, nil) _, _, err := puller.EnsureImageExists(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions) fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
} }
@ -150,7 +150,7 @@ func TestSerializedPuller(t *testing.T) {
for tick, expected := range c.expectedErr { for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
err, _ := puller.EnsureImageExists(pod, container, nil) _, _, err := puller.EnsureImageExists(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions) fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
} }

View File

@ -24,8 +24,13 @@ import (
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
type imageRefWithError struct {
imageRef string
err error
}
type imagePuller interface { type imagePuller interface {
pullImage(kubecontainer.ImageSpec, []v1.Secret, chan<- error) pullImage(kubecontainer.ImageSpec, []v1.Secret, chan<- imageRefWithError)
} }
var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{} var _, _ imagePuller = &parallelImagePuller{}, &serialImagePuller{}
@ -38,9 +43,13 @@ func newParallelImagePuller(imageService kubecontainer.ImageService) imagePuller
return &parallelImagePuller{imageService} return &parallelImagePuller{imageService}
} }
func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, errChan chan<- error) { func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- imageRefWithError) {
go func() { go func() {
errChan <- pip.imageService.PullImage(spec, pullSecrets) imageRef, err := pip.imageService.PullImage(spec, pullSecrets)
pullChan <- imageRefWithError{
imageRef: imageRef,
err: err,
}
}() }()
} }
@ -61,19 +70,23 @@ func newSerialImagePuller(imageService kubecontainer.ImageService) imagePuller {
type imagePullRequest struct { type imagePullRequest struct {
spec kubecontainer.ImageSpec spec kubecontainer.ImageSpec
pullSecrets []v1.Secret pullSecrets []v1.Secret
errChan chan<- error pullChan chan<- imageRefWithError
} }
func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, errChan chan<- error) { func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- imageRefWithError) {
sip.pullRequests <- &imagePullRequest{ sip.pullRequests <- &imagePullRequest{
spec: spec, spec: spec,
pullSecrets: pullSecrets, pullSecrets: pullSecrets,
errChan: errChan, pullChan: pullChan,
} }
} }
func (sip *serialImagePuller) processImagePullRequests() { func (sip *serialImagePuller) processImagePullRequests() {
for pullRequest := range sip.pullRequests { for pullRequest := range sip.pullRequests {
pullRequest.errChan <- sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets) imageRef, err := sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets)
pullRequest.pullChan <- imageRefWithError{
imageRef: imageRef,
err: err,
}
} }
} }

View File

@ -49,7 +49,7 @@ var (
// Implementations are expected to be thread safe. // Implementations are expected to be thread safe.
type ImageManager interface { type ImageManager interface {
// EnsureImageExists ensures that image specified in `container` exists. // EnsureImageExists ensures that image specified in `container` exists.
EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (error, string) EnsureImageExists(pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret) (string, string, error)
// TODO(ronl): consolidating image managing and deleting operation in this interface // TODO(ronl): consolidating image managing and deleting operation in this interface
} }