Wrapping image pullers inside ImageManager

This commit is contained in:
Ron Lai 2016-07-14 15:54:02 -07:00
parent 56b9daf50f
commit 367a683273
7 changed files with 65 additions and 46 deletions

View File

@ -1,9 +1,12 @@
/* /*
Copyright 2016 The Kubernetes Authors. Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -1,10 +1,12 @@
/* /*
Copyright 2016 The Kubernetes Authors All. Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -14,28 +16,37 @@ limitations under the License.
package images package images
type ImageManager struct { import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
type imageManager struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime container.Runtime runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff backOff *flowcontrol.Backoff
imagePuller imagePuller imagePuller imagePuller
} }
func NewImageManager(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { var _ ImageManager = &imageManager{}
func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager {
var imagePuller imagePuller var imagePuller imagePuller
if serialized { if serialized {
imagePuller = NewSerializedImagePuller(recorder, runtime, imageBackOff) imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff)
} else { } else {
imagePuller = NewParallelImagePuller(recorder, runtime, imageBackOff) imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff)
} }
return &imageManager{ return &imageManager{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,
backOff: backOff, backOff: imageBackOff,
imagePuller: imagePuller, imagePuller: imagePuller,
} }
} }
func (*) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
return imagePuller.pullImage(pod , container pullSecrets) return im.imagePuller.pullImage(pod, container, pullSecrets)
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
) )
@ -31,7 +32,7 @@ import (
// 'image pulled' events correspondingly. // 'image pulled' events correspondingly.
type parallelImagePuller struct { type parallelImagePuller struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime Runtime runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff backOff *flowcontrol.Backoff
} }
@ -40,8 +41,8 @@ var _ imagePuller = &parallelImagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a // NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
return &imagePuller{ return &parallelImagePuller{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,
backOff: imageBackOff, backOff: imageBackOff,
@ -73,19 +74,19 @@ func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, ev
} }
// PullImage pulls the image for the specified pod and container. // PullImage pulls the image for the specified pod and container.
func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container) ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
} }
spec := ImageSpec{container.Image} spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := puller.runtime.IsImagePresent(spec) present, err := puller.runtime.IsImagePresent(spec)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrImageInspect, msg return kubecontainer.ErrImageInspect, msg
} }
if !shouldPullImage(container, present) { if !shouldPullImage(container, present) {
@ -96,7 +97,7 @@ func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Contai
} else { } else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg return kubecontainer.ErrImageNeverPull, msg
} }
} }
@ -104,17 +105,17 @@ func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Contai
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image) msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg return kubecontainer.ErrImagePullBackOff, msg
} }
puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err := puller.runtime.PullImage(spec, pullSecrets); err != nil { if err := puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable { if err == kubecontainer.RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
return err, msg return err, msg
} else { } else {
return ErrImagePull, err.Error() return kubecontainer.ErrImagePull, err.Error()
} }
} }
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package container_test package images
import ( import (
"errors" "errors"
@ -106,7 +106,7 @@ func TestPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{} fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}} fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}}
fakeRuntime.Err = c.pullerErr fakeRuntime.Err = c.pullerErr
@ -114,7 +114,7 @@ func TestPuller(t *testing.T) {
for tick, expected := range c.expectedErr { for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil) err, _ := puller.pullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions) fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
} }

View File

@ -23,13 +23,14 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
type imagePullRequest struct { type imagePullRequest struct {
spec ImageSpec spec kubecontainer.ImageSpec
container *api.Container container *api.Container
pullSecrets []api.Secret pullSecrets []api.Secret
logPrefix string logPrefix string
@ -42,7 +43,7 @@ type imagePullRequest struct {
// 'image pulled' events correspondingly. // 'image pulled' events correspondingly.
type serializedImagePuller struct { type serializedImagePuller struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime Runtime runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff backOff *flowcontrol.Backoff
pullRequests chan *imagePullRequest pullRequests chan *imagePullRequest
} }
@ -54,7 +55,7 @@ var _ imagePuller = &serializedImagePuller{}
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time. // Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls. // Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
imagePuller := &serializedImagePuller{ imagePuller := &serializedImagePuller{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,
@ -75,19 +76,19 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype,
} }
// PullImage pulls the image for the specified pod and container. // PullImage pulls the image for the specified pod and container.
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container) ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
} }
spec := ImageSpec{container.Image} spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := puller.runtime.IsImagePresent(spec) present, err := puller.runtime.IsImagePresent(spec)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrImageInspect, msg return kubecontainer.ErrImageInspect, msg
} }
if !shouldPullImage(container, present) { if !shouldPullImage(container, present) {
@ -98,7 +99,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
} else { } else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg return kubecontainer.ErrImageNeverPull, msg
} }
} }
@ -106,7 +107,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image) msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg return kubecontainer.ErrImagePullBackOff, msg
} }
// enqueue image pull request and wait for response. // enqueue image pull request and wait for response.
@ -122,11 +123,11 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
if err = <-returnChan; err != nil { if err = <-returnChan; err != nil {
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable { if err == kubecontainer.RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
return err, msg return err, msg
} else { } else {
return ErrImagePull, err.Error() return kubecontainer.ErrImagePull, err.Error()
} }
} }
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package container_test package images
import ( import (
"errors" "errors"
@ -106,7 +106,7 @@ func TestSerializedPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{} fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}} fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}}
fakeRuntime.Err = c.pullerErr fakeRuntime.Err = c.pullerErr
@ -114,7 +114,7 @@ func TestSerializedPuller(t *testing.T) {
for tick, expected := range c.expectedErr { for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil) err, _ := puller.pullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions) fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
} }

View File

@ -1,9 +1,12 @@
/* /*
Copyright 2016 The Kubernetes Authors All. Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.